Skip to content

๐Ÿ“ˆ M15: Observability & Evaluation โ€‹

This module covers telemetry, regression testing, and semantic evaluation frameworks for multi-agent architectures. You will implement OpenTelemetry (OTel) trace hierarchies using Langfuse, build prompt regression test suites with pytest, and run offline semantic hallucination evaluations using Arize Phoenix.

๐Ÿ“ Training Workspace: ~/AI_BOOTCAMP


๐Ÿ›๏ธ 1. Architectural Deep Dive: Agentic Telemetry & Evaluation โ€‹

Unlike simple single-turn chatbots, autonomous agents make complex, non-deterministic runs containing nested loop cycles, database lookups, sandboxed tool executions, and parallel agent dispatches. Basic logging structures (like print statements or single-span logs) fail to capture the hierarchical dependencies of these operations.

A. Hierarchical Telemetry & Context Propagation โ€‹

Telemetry tools map executions as a Directed Acyclic Graph (DAG) of Spans under a parent Trace.

  • Context Propagation: To preserve parent-child span nesting across async loop cycles or different process runtimes, the gateway passes a unique context token (containing the TraceID and parent SpanID). Python libraries use contextvars or explicit OpenTelemetry span headers to propagate this context.

B. Physical Constraints & Telemetry Bottlenecks โ€‹

  1. Network Telemetry Latency:
    • Sending telemetry payloads synchronously to an external service (e.g. Langfuse SaaS or Datadog) after every span adds severe latency (100โ€“300ms per API call).
    • Mitigation: Use asynchronous batch tracing. Telemetry client libraries buffer spans in a local memory queue and dispatch them in batches via background worker threads.
  2. Telemetry Memory Footprint & Queue Overflow:
    • If the agent receives traffic that exceeds the background thread's dispatch rate, the local telemetry queue will grow in size, consuming host memory.
    • Mitigation: Implement bounded buffers. When the queue limits are reached, the system must either discard telemetry data (best-effort logging) or block the main thread (guaranteed logging).
  3. Semantic Evaluation Compute Cost:
    • Evaluating responses for hallucinations requires passing the user prompt, retrieved context, and agent output to a "Judge LLM" (G-Eval pattern). This process doubles token consumption and execution costs.
    • Mitigation: Perform evaluations asynchronously out-of-band using small local models (e.g., Llama-3-8B running via Ollama or vLLM) rather than querying expensive proprietary APIs.

๐Ÿ“Š 2. Tradeoff Matrix: Telemetry & Evaluation Architectures โ€‹

Observability ApproachLatency ImpactTrace DetailOffline Evaluation CapabilityOperational Setup ComplexityPrimary Production Bottleneck
SaaS APM (Langfuse / LLMops)Low (Background Async Batching)High (LLM-native schemas + prompt tracking)Low (Requires SaaS connection)Low (API Key authentication)Network transit bandwidth and rate limits
Local OTel (Phoenix / OpenTelemetry)Low (Local network loopback)High (Hierarchical spans + vector visualizations)High (Fully local offline execution)Moderate (Requires hosting local server)Host memory and CPU storage scaling
Traditional APM (Datadog / OpenSearch)Low (Local agent daemon collects metrics)Low (Mainly shows raw HTTP/DB metadata, not prompt steps)NoneHigh (Requires agent daemon configuration)Expensive data ingestion pricing models
Structured JSON File LoggingZero (Fast local writing)Moderate (Requires custom log parsing)None (Requires parsing scripts)Low (Files written to local disk)Disk I/O speed under massive scale

๐Ÿ› ๏ธ 3. Step-by-Step Mechanics: Tracing, Testing, & Evals โ€‹

A. OpenTelemetry Instrumentation with Langfuse โ€‹

Instrument nested agent functions using decorators to capture execution paths, inputs, outputs, and token costs.

Create ~/AI_BOOTCAMP/labs/observability/tracer.py:

python
import os
import time
from langfuse.decorators import observe, langfuse_context
from dotenv import load_dotenv

load_dotenv()

# Configure local or dummy credentials (generalize for labs)
os.environ["LANGFUSE_PUBLIC_KEY"] = os.environ.get("LANGFUSE_PUBLIC_KEY", "pk-lf-mock-key")
os.environ["LANGFUSE_SECRET_KEY"] = os.environ.get("LANGFUSE_SECRET_KEY", "sk-lf-mock-key")
os.environ["LANGFUSE_HOST"] = os.environ.get("LANGFUSE_HOST", "http://localhost:3000")

@observe(name="database_lookup")
def query_agent_memory(query: str) -> str:
    """Simulates a database index retrieval span."""
    # Update active span metadata
    langfuse_context.update_current_span(
        input=query,
        metadata={"index_type": "pgvector_hnsw"}
    )
    time.sleep(0.3)  # Simulate DB query delay
    return f"Context: Database verified product availability for '{query}' as in-stock."

@observe(name="agent_main_workflow")
def generate_agent_response(user_input: str) -> str:
    """Parent span orchestrating memory retrieval and response synthesis."""
    # Parent context is automatically propagated to nested decorated functions
    context = query_agent_memory(user_input)
    
    time.sleep(0.5)  # Simulate LLM reasoning latency
    result = f"Response: {context} Proceeding to order processing."
    
    # Log token usage metrics
    langfuse_context.update_current_generation(
        usage={"prompt_tokens": 120, "completion_tokens": 45}
    )
    return result

if __name__ == "__main__":
    print("Initiating agent run with OTel telemetry...")
    report = generate_agent_response("Hardware License")
    print(f"Workflow complete. Output: {report}")

B. Prompt Regression Unit Testing with pytest โ€‹

Construct unit tests to assert structure, service-level latency limits, and token cost bounds.

Create ~/AI_BOOTCAMP/labs/observability/test_prompt.py:

python
import pytest
import time

class AgentResult:
    def __init__(self, output: str, latency: float, cost: float):
        self.output = output
        self.latency = latency
        self.cost = cost

def run_agent_engine(prompt: str) -> AgentResult:
    """Mock agent pipeline execution simulating prompt processing."""
    start_time = time.time()
    # Simulate execution duration
    time.sleep(0.5)
    elapsed = time.time() - start_time
    
    return AgentResult(
        output="System Analysis Report: Operations verified. Total calculated cost $12,000.",
        latency=elapsed,
        cost=0.012
    )

def test_agent_structural_and_perf_bounds():
    test_prompt = "Verify daily performance numbers"
    result = run_agent_engine(test_prompt)
    
    # Assertions: Content checks
    assert "report" in result.output.lower(), "Verification failed: Output missing required header."
    assert "$" in result.output, "Verification failed: Output missing currency identifier."
    
    # Assertions: Performance Latency SLA (e.g. limit to maximum 1.5 seconds)
    assert result.latency <= 1.5, f"Latency SLA violation: Execution took {result.latency} seconds."
    
    # Assertions: Cost budget protection
    max_allowable_cost = 0.05
    assert result.cost <= max_allowable_cost, f"Budget exceeded: Run cost of ${result.cost} exceeds limit."

C. Offline Semantic Hallucination Evaluation with Arize Phoenix โ€‹

Deploy a local Arize Phoenix telemetry server to check context relevance and flag hallucinations.

Create ~/AI_BOOTCAMP/labs/observability/phoenix_eval.py:

python
import os
import pandas as pd
import phoenix as px
from phoenix.evals import HallucinationEvaluator, OpenAIModel

# 1. Start local Phoenix Telemetry Server
session = px.launch_app()
print(f"\n[INFO] Local Arize Phoenix Server active at: {session.url}")

# 2. Construct mock evaluation dataset
eval_dataset = pd.DataFrame({
    "query": ["What was the Q1 production total?"],
    "reference": ["Production log indicates 5,400 units were built in Q1."],
    "output": ["The Q1 production total was 5,400 units."]
})

# 3. Configure local or remote evaluator client
# Assumes api_key configured in environmental variables
eval_model = OpenAIModel(
    model="gpt-4o-mini",
    api_key=os.environ.get("OPENAI_API_KEY", "mock-openai-key")
)

# 4. Instantiate semantic hallucination evaluator
hallucination_evaluator = HallucinationEvaluator(eval_model)

print("\nRunning offline semantic evaluation...")
# Returns evaluation labels (e.g., correct/hallucinated) and scores
evaluation_results = hallucination_evaluator.evaluate(eval_dataset)

print("\n=== EVALUATION COMPLETED ===")
print(evaluation_results)

๐Ÿ” 4. Failure Mode Analysis (FMA) โ€‹

Failure ModeLog / Telemetry SignatureRoot CauseMitigation Action
APM Connection Blocklangfuse SDK Timeout: Failed to flush trace batchThe background worker thread cannot reach the telemetry host endpoint.Verify LANGFUSE_HOST settings and firewall routing. Configure client-side timeouts so connectivity issues do not crash execution threads.
Nesting Context LossTraces register as flat lists of independent items rather than parent-child relationships.The context token was lost during async loop spawns or separate process invocations.Ensure thread pools use OTel-compatible context wrapper functions to pass active span contexts.
Evaluator Rate LimitsHTTP Error 429: Too Many RequestsThe evaluation loop exceeds the model provider's rate limits.Implement batch queuing with throttling controls, or run a local, offline evaluation model using Ollama.
Telemetry Buffer BloatWarning: Telemetry queue full. Dropping spans.The rate of incoming spans exceeds the rate of batch writes to the database.Adjust maximum queue configurations in your SDK initializer, or run local OTel collectors to handle ingestion spikes.

๐Ÿงช 5. Runtime Verification: Telemetry & Test Assertions โ€‹

A. Running Assertions and Telemetry Checks โ€‹

  1. Execute the pytest regression suite:
    bash
    cd ~/AI_BOOTCAMP/labs/observability
    pytest -v test_prompt.py
  2. Confirm Pytest Output: Verify tests pass within nominal performance and cost budgets:
    text
    test_prompt.py::test_agent_structural_and_perf_bounds PASSED [100%]
    ======================= 1 passed in 0.52s =======================

B. Launching Phoenix Local Telemetry โ€‹

  1. Run the local evaluator script:
    bash
    python phoenix_eval.py
  2. Verify Dashboard Launch: Confirm the Arize Phoenix listener initializes and binds to the default port:
    text
    [INFO] Local Arize Phoenix Server active at: http://localhost:6006
    Running offline semantic evaluation...
    === EVALUATION COMPLETED ===
       label  score                                        explanation
    0  false    0.0  The output matches the reference information...
  3. Open http://localhost:6006 in your browser. Verify the dashboard loads and lists data structures.