Appearance
๐ M14: Real-Time Streaming & Agent UI โ
This module covers the design, protocols, and performance bottlenecks of real-time streaming interfaces for agentic workflows. You will learn to construct asynchronous stream generators in Python, expose ASGI-compliant Server-Sent Events (SSE) endpoints with FastAPI, and render multi-agent thought steps using collapsible Streamlit accordion controls.
๐ Training Workspace: ~/AI_BOOTCAMP
๐๏ธ 1. Architectural Deep Dive: Streaming Protocols & Browser Bottlenecks โ
Traditional REST requests require the client to wait until the LLM completes its entire response (often 5โ15 seconds for long agent tasks). By utilizing streaming, you reduce the Time-to-First-Token (TTFT) from seconds to milliseconds, significantly improving user experience.
A. Server-Sent Events (SSE) Protocol Design โ
SSE is a text-based, unidirectional streaming protocol built on top of standard HTTP. The server keeps the connection open by returning a Content-Type: text/event-stream header. The data payload is framed in discrete chunks separated by double newlines (\n\n).
text
event: status
data: {"message": "Searching vector index..."}
event: token
data: {"word": "The"}
event: token
data: {"word": " capital"}B. Physical Constraints & Production Bottlenecks โ
- Browser Connection Limits (HTTP/1.1 vs. HTTP/2):
- Under HTTP/1.1, browsers restrict the maximum number of concurrent persistent connections to a single domain to 6. If a user opens more than 6 tabs streaming agent interactions, subsequent requests will block indefinitely.
- Mitigation: Force HTTP/2 on your reverse proxies. HTTP/2 supports multiplexing, allowing hundreds of streams to run concurrently over a single TCP connection.
- Reverse Proxy Buffering (Nginx, Cloudflare):
- By default, reverse proxies buffer upstream data packets to optimize network usage. If buffering is enabled, the proxy holds onto SSE tokens and flushes them in large batches, breaking the real-time stream.
- Mitigation: Set explicit headers in the FastAPI response:
X-Accel-Buffering: no(disables Nginx buffering)Cache-Control: no-cache
- Backpressure and TCP Buffer Exhaustion:
- If a client has a slow network link, the server's TCP socket buffer fills up. If the application loop continues generating tokens without checking socket write status, the server accumulates memory buffers, causing a memory leak.
- Mitigation: Use ASGI-native servers (like Uvicorn/Hypercorn) that yield backpressure controls and properly abort generation loop execution when a client disconnects.
- VRAM to Output Latency Coupling:
- GPU inference engines generate tokens at a variable rate (e.g. 15โ80 tokens per second). If the client application cannot render tokens fast enough, browser layout repaints will throttle the main JavaScript thread, locking the UI.
๐ 2. Tradeoff Matrix: Real-Time UI Protocols โ
| Protocol | Directionality | Latency | Connection Overhead | Firewall Friendliness | Browser Connection Limit | Primary Production Bottleneck |
|---|---|---|---|---|---|---|
| Server-Sent Events (SSE) | Unidirectional (Server -> Client) | Low (~50ms) | Low (Uses standard HTTP) | High (Standard port 80/443) | 6 under HTTP/1.1 (Unlimited on HTTP/2) | Reverse proxy buffering configurations |
| WebSockets | Bidirectional (Full Duplex) | Ultra-Low (<10ms) | High (Requires handshake upgrade) | Moderate (Often blocked by strict corporate firewalls) | None | High connection state memory tracking on server |
| HTTP Long Polling | Simulates bidirectional | High (1โ2s) | High (Constant HTTP handshakes) | High | 6 per domain | Severe server-side CPU and thread exhaustion |
| GraphQL Subscriptions | Bidirectional wrapper | Low (~60ms) | High (Client libraries + WebSockets) | Moderate (Relies on WebSocket transport) | None | Library overhead and parsing complexity |
๐ ๏ธ 3. Step-by-Step Mechanics: Async Generators & FastAPI Streaming โ
A. Asynchronous Token Yielding (Async Generators) โ
Normal Python functions block execution until they return a final object. Async generators utilize async def and the yield statement to pass values back to the caller while temporarily yielding control back to the event loop.
Create ~/AI_BOOTCAMP/labs/agent-ui/token_yielder.py:
python
import asyncio
async def stream_agent_thoughts():
"""Simulates an agent executing cognitive steps and yielding text tokens."""
steps = [
"THOUGHT: Evaluating schema layout...\n",
"ACTION: Querying index parameters...\n",
"RESPONSE: Operations running normal.\n"
]
for step in steps:
# Yield the header of the cognitive phase
yield step
await asyncio.sleep(0.5) # Yield control to the event loop
# Stream sub-tokens word-by-word
detail = "Executing local system command to verify file list..."
for word in detail.split(" "):
yield f"{word} "
# Non-blocking async sleep simulates generation latency
await asyncio.sleep(0.08)
yield "\n\n"
async def main():
print("Initiating async generator stream...\n")
# Iterate asynchronously over the generator
async for token in stream_agent_thoughts():
print(token, end="", flush=True)
print("\nโ
Stream completed successfully.")
if __name__ == "__main__":
asyncio.run(main())B. Deploying the FastAPI SSE Endpoint โ
Expose the async generator through an ASGI-compliant HTTP endpoint.
Create ~/AI_BOOTCAMP/labs/agent-ui/api_server.py:
python
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(title="ASGI Agent Streaming Gateway")
# Enable CORS for frontend integration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
async def event_generator():
"""Generates SSE formatted events. Double newlines are mandatory."""
try:
steps = [
"Parsing database context metadata...",
"Initiating semantic search query in pgvector...",
"Filtering output with local validation rules."
]
for idx, step in enumerate(steps, 1):
# Send step headers as custom event types
yield f"event: step\ndata: Step {idx}: {step}\n\n"
await asyncio.sleep(1.0)
# Send content as standard token data
tokens = ["Summarized", " output:", " Active", " connection", " verified."]
for token in tokens:
yield f"event: token\ndata: {token}\n\n"
await asyncio.sleep(0.1)
# Signal stream end
yield "event: close\ndata: [DONE]\n\n"
except asyncio.CancelledError:
# Triggered when the client closes the browser connection
print("Connection closed by client. Terminating stream execution.")
@app.get("/stream-agent")
async def stream_agent():
# Set headers to prevent proxy buffering
headers = {
"X-Accel-Buffering": "no",
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers=headers
)
if __name__ == "__main__":
import uvicorn
uvicorn.run("api_server:app", host="0.0.0.0", port=8000, log_level="info")C. Constructing the Accordion Streamlit Interface โ
Implement a frontend that separates raw multi-agent logs (contained in an expandable accordion/status container) from the final response output.
Create ~/AI_BOOTCAMP/labs/agent-ui/app_ui.py:
python
import streamlit as st
import time
st.set_page_config(page_title="AI Agent Execution UI", layout="centered")
st.title("๐ง Multi-Agent Execution Portal")
if st.button("Trigger Agentic Execution"):
# Create the accordion/status container to group system updates
with st.status("Agent resolving request...", expanded=True) as status_box:
st.write("๐ Extracting local system files...")
time.sleep(1.0)
st.write("๐ค Invoking sub-agent compiler...")
time.sleep(1.0)
st.write("๐ก๏ธ Validating package security stubs...")
time.sleep(1.0)
# Collapse container on completion to clean up UI space
status_box.update(label="Tasks completed successfully", state="complete", expanded=False)
st.success("โ
Run Completed")
st.markdown("### Final System Report:")
st.write("Target files verified. System executing within nominal parameters.")๐ 4. Failure Mode Analysis (FMA) โ
| Failure Mode | Log / UI Signature | Root Cause | Mitigation Action |
|---|---|---|---|
| Proxy Buffering | UI freezes for 5-10 seconds, then dumps all text chunks at once. | Nginx or Cloudflare is buffering HTTP response packages. | Include X-Accel-Buffering: no header in FastAPI response and add proxy_buffering off; to your Nginx location block. |
| Event Loop Blocking | The server freezes and fails to handle concurrent API requests during streaming. | Synchronous functions (e.g. time.sleep()) are executed inside the async generator. | Replace blocking calls with async alternatives (e.g. await asyncio.sleep()) or run sync operations in separate threads. |
| CORS Access Denied | Browser console shows: Access-Control-Allow-Origin header missing. | The frontend is hosted on a separate port or domain, and CORS middleware is missing. | Add and configure CORSMiddleware in FastAPI to allow the host origin. |
| Memory Leak / Hangs | RuntimeError: Stream closed ignored. Server memory increases. | The generator does not handle client disconnection, leaking execution resources. | Implement except asyncio.CancelledError: block inside generator to close active loops. |
๐งช 5. Runtime Verification: Auditing Streams & UI Outputs โ
A. Testing API Stream via Command Line โ
- Launch the FastAPI Server:bash
cd ~/AI_BOOTCAMP/labs/agent-ui python api_server.py - Audit stream using curl: Execute an HTTP request with the no-buffer (
-N) flag:bashcurl -N -H "Accept: text/event-stream" http://localhost:8000/stream-agent - Inspect raw stdout: Ensure the stream chunk structures yield immediately:text
event: step data: Step 1: Parsing database context metadata... event: token data: Summarized event: token data: output:
B. Verifying Streamlit Thought Accordions โ
- Start the Streamlit application:bash
streamlit run app_ui.py --server.port 8501 - Open your browser and navigate to
http://localhost:8501. - Click Trigger Agentic Execution. Verify that:
- The status container expands to display live sub-task outputs.
- The status container collapses to a clean green header upon task completion.
- The final response is displayed underneath.