Skip to content

๐Ÿ“ˆ M14: Real-Time Streaming & Agent UI โ€‹

This module covers the design, protocols, and performance bottlenecks of real-time streaming interfaces for agentic workflows. You will learn to construct asynchronous stream generators in Python, expose ASGI-compliant Server-Sent Events (SSE) endpoints with FastAPI, and render multi-agent thought steps using collapsible Streamlit accordion controls.

๐Ÿ“ Training Workspace: ~/AI_BOOTCAMP


๐Ÿ›๏ธ 1. Architectural Deep Dive: Streaming Protocols & Browser Bottlenecks โ€‹

Traditional REST requests require the client to wait until the LLM completes its entire response (often 5โ€“15 seconds for long agent tasks). By utilizing streaming, you reduce the Time-to-First-Token (TTFT) from seconds to milliseconds, significantly improving user experience.

A. Server-Sent Events (SSE) Protocol Design โ€‹

SSE is a text-based, unidirectional streaming protocol built on top of standard HTTP. The server keeps the connection open by returning a Content-Type: text/event-stream header. The data payload is framed in discrete chunks separated by double newlines (\n\n).

text
event: status
data: {"message": "Searching vector index..."}

event: token
data: {"word": "The"}

event: token
data: {"word": " capital"}

B. Physical Constraints & Production Bottlenecks โ€‹

  1. Browser Connection Limits (HTTP/1.1 vs. HTTP/2):
    • Under HTTP/1.1, browsers restrict the maximum number of concurrent persistent connections to a single domain to 6. If a user opens more than 6 tabs streaming agent interactions, subsequent requests will block indefinitely.
    • Mitigation: Force HTTP/2 on your reverse proxies. HTTP/2 supports multiplexing, allowing hundreds of streams to run concurrently over a single TCP connection.
  2. Reverse Proxy Buffering (Nginx, Cloudflare):
    • By default, reverse proxies buffer upstream data packets to optimize network usage. If buffering is enabled, the proxy holds onto SSE tokens and flushes them in large batches, breaking the real-time stream.
    • Mitigation: Set explicit headers in the FastAPI response:
      • X-Accel-Buffering: no (disables Nginx buffering)
      • Cache-Control: no-cache
  3. Backpressure and TCP Buffer Exhaustion:
    • If a client has a slow network link, the server's TCP socket buffer fills up. If the application loop continues generating tokens without checking socket write status, the server accumulates memory buffers, causing a memory leak.
    • Mitigation: Use ASGI-native servers (like Uvicorn/Hypercorn) that yield backpressure controls and properly abort generation loop execution when a client disconnects.
  4. VRAM to Output Latency Coupling:
    • GPU inference engines generate tokens at a variable rate (e.g. 15โ€“80 tokens per second). If the client application cannot render tokens fast enough, browser layout repaints will throttle the main JavaScript thread, locking the UI.

๐Ÿ“Š 2. Tradeoff Matrix: Real-Time UI Protocols โ€‹

ProtocolDirectionalityLatencyConnection OverheadFirewall FriendlinessBrowser Connection LimitPrimary Production Bottleneck
Server-Sent Events (SSE)Unidirectional (Server -> Client)Low (~50ms)Low (Uses standard HTTP)High (Standard port 80/443)6 under HTTP/1.1 (Unlimited on HTTP/2)Reverse proxy buffering configurations
WebSocketsBidirectional (Full Duplex)Ultra-Low (<10ms)High (Requires handshake upgrade)Moderate (Often blocked by strict corporate firewalls)NoneHigh connection state memory tracking on server
HTTP Long PollingSimulates bidirectionalHigh (1โ€“2s)High (Constant HTTP handshakes)High6 per domainSevere server-side CPU and thread exhaustion
GraphQL SubscriptionsBidirectional wrapperLow (~60ms)High (Client libraries + WebSockets)Moderate (Relies on WebSocket transport)NoneLibrary overhead and parsing complexity

๐Ÿ› ๏ธ 3. Step-by-Step Mechanics: Async Generators & FastAPI Streaming โ€‹

A. Asynchronous Token Yielding (Async Generators) โ€‹

Normal Python functions block execution until they return a final object. Async generators utilize async def and the yield statement to pass values back to the caller while temporarily yielding control back to the event loop.

Create ~/AI_BOOTCAMP/labs/agent-ui/token_yielder.py:

python
import asyncio

async def stream_agent_thoughts():
    """Simulates an agent executing cognitive steps and yielding text tokens."""
    steps = [
        "THOUGHT: Evaluating schema layout...\n",
        "ACTION: Querying index parameters...\n",
        "RESPONSE: Operations running normal.\n"
    ]
    
    for step in steps:
        # Yield the header of the cognitive phase
        yield step
        await asyncio.sleep(0.5)  # Yield control to the event loop
        
        # Stream sub-tokens word-by-word
        detail = "Executing local system command to verify file list..."
        for word in detail.split(" "):
            yield f"{word} "
            # Non-blocking async sleep simulates generation latency
            await asyncio.sleep(0.08)
        yield "\n\n"

async def main():
    print("Initiating async generator stream...\n")
    # Iterate asynchronously over the generator
    async for token in stream_agent_thoughts():
        print(token, end="", flush=True)
    print("\nโœ… Stream completed successfully.")

if __name__ == "__main__":
    asyncio.run(main())

B. Deploying the FastAPI SSE Endpoint โ€‹

Expose the async generator through an ASGI-compliant HTTP endpoint.

Create ~/AI_BOOTCAMP/labs/agent-ui/api_server.py:

python
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI(title="ASGI Agent Streaming Gateway")

# Enable CORS for frontend integration
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

async def event_generator():
    """Generates SSE formatted events. Double newlines are mandatory."""
    try:
        steps = [
            "Parsing database context metadata...",
            "Initiating semantic search query in pgvector...",
            "Filtering output with local validation rules."
        ]
        
        for idx, step in enumerate(steps, 1):
            # Send step headers as custom event types
            yield f"event: step\ndata: Step {idx}: {step}\n\n"
            await asyncio.sleep(1.0)
            
            # Send content as standard token data
            tokens = ["Summarized", " output:", " Active", " connection", " verified."]
            for token in tokens:
                yield f"event: token\ndata: {token}\n\n"
                await asyncio.sleep(0.1)
                
        # Signal stream end
        yield "event: close\ndata: [DONE]\n\n"
    except asyncio.CancelledError:
        # Triggered when the client closes the browser connection
        print("Connection closed by client. Terminating stream execution.")

@app.get("/stream-agent")
async def stream_agent():
    # Set headers to prevent proxy buffering
    headers = {
        "X-Accel-Buffering": "no",
        "Cache-Control": "no-cache",
        "Connection": "keep-alive"
    }
    return StreamingResponse(
        event_generator(), 
        media_type="text/event-stream", 
        headers=headers
    )

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("api_server:app", host="0.0.0.0", port=8000, log_level="info")

C. Constructing the Accordion Streamlit Interface โ€‹

Implement a frontend that separates raw multi-agent logs (contained in an expandable accordion/status container) from the final response output.

Create ~/AI_BOOTCAMP/labs/agent-ui/app_ui.py:

python
import streamlit as st
import time

st.set_page_config(page_title="AI Agent Execution UI", layout="centered")
st.title("๐Ÿง  Multi-Agent Execution Portal")

if st.button("Trigger Agentic Execution"):
    # Create the accordion/status container to group system updates
    with st.status("Agent resolving request...", expanded=True) as status_box:
        st.write("๐Ÿ” Extracting local system files...")
        time.sleep(1.0)
        
        st.write("๐Ÿค– Invoking sub-agent compiler...")
        time.sleep(1.0)
        
        st.write("๐Ÿ›ก๏ธ Validating package security stubs...")
        time.sleep(1.0)
        
        # Collapse container on completion to clean up UI space
        status_box.update(label="Tasks completed successfully", state="complete", expanded=False)
        
    st.success("โœ… Run Completed")
    st.markdown("### Final System Report:")
    st.write("Target files verified. System executing within nominal parameters.")

๐Ÿ” 4. Failure Mode Analysis (FMA) โ€‹

Failure ModeLog / UI SignatureRoot CauseMitigation Action
Proxy BufferingUI freezes for 5-10 seconds, then dumps all text chunks at once.Nginx or Cloudflare is buffering HTTP response packages.Include X-Accel-Buffering: no header in FastAPI response and add proxy_buffering off; to your Nginx location block.
Event Loop BlockingThe server freezes and fails to handle concurrent API requests during streaming.Synchronous functions (e.g. time.sleep()) are executed inside the async generator.Replace blocking calls with async alternatives (e.g. await asyncio.sleep()) or run sync operations in separate threads.
CORS Access DeniedBrowser console shows: Access-Control-Allow-Origin header missing.The frontend is hosted on a separate port or domain, and CORS middleware is missing.Add and configure CORSMiddleware in FastAPI to allow the host origin.
Memory Leak / HangsRuntimeError: Stream closed ignored. Server memory increases.The generator does not handle client disconnection, leaking execution resources.Implement except asyncio.CancelledError: block inside generator to close active loops.

๐Ÿงช 5. Runtime Verification: Auditing Streams & UI Outputs โ€‹

A. Testing API Stream via Command Line โ€‹

  1. Launch the FastAPI Server:
    bash
    cd ~/AI_BOOTCAMP/labs/agent-ui
    python api_server.py
  2. Audit stream using curl: Execute an HTTP request with the no-buffer (-N) flag:
    bash
    curl -N -H "Accept: text/event-stream" http://localhost:8000/stream-agent
  3. Inspect raw stdout: Ensure the stream chunk structures yield immediately:
    text
    event: step
    data: Step 1: Parsing database context metadata...
    
    event: token
    data: Summarized
    
    event: token
    data:  output:

B. Verifying Streamlit Thought Accordions โ€‹

  1. Start the Streamlit application:
    bash
    streamlit run app_ui.py --server.port 8501
  2. Open your browser and navigate to http://localhost:8501.
  3. Click Trigger Agentic Execution. Verify that:
    • The status container expands to display live sub-task outputs.
    • The status container collapses to a clean green header upon task completion.
    • The final response is displayed underneath.