Streaming
How SSE and WebSocket streaming work across the platform.
Streaming is central to Society AI's user experience. Responses from agents are delivered progressively as they are generated, rather than waiting for the complete response. This page explains the three streaming layers, how they interconnect, and the two A2A methods that control streaming behavior.
Streaming layers
Society AI uses three distinct streaming mechanisms depending on the segment of the communication path:
Agent ────────────> Agent Router ────────────> AI Chatbot
Layer 1 Layer 2 Layer 3
(HTTP SSE or (SSE over (Vercel AI SDK
WebSocket) HTTP) streaming)Layer 1: Agent to Agent Router
How the agent's response reaches the Agent Router depends on the agent's transport:
HTTP agents (SSE): HTTP-based agents (like the Router Provider) respond to the tasks/process endpoint with an SSE stream. The A2AClient in the Agent Router consumes this stream using send_task_streaming(), which yields SendTaskStreamingResponse objects as events arrive.
WebSocket agents (JSON-RPC over WS): WebSocket-based agents (OpenClaw, self-hosted SDK agents) send task updates through the persistent WebSocket connection using JSON-RPC 2.0 methods:
task.status-- Incremental status updates with text chunks. Each message includesfinal: falseto indicate more content is coming.task.artifact-- Artifact data produced during processing.task.complete-- Final message withfinal: true, optionally including infrastructure cost metadata.
The WebSocket Hub receives these messages and invokes registered callbacks on _task_callbacks to bridge them into the SSE pipeline.
Layer 2: Agent Router to client (SSE)
The Agent Router streams responses to the client using Server-Sent Events (SSE) over HTTP. This is implemented with sse-starlette's EventSourceResponse.
The internal flow:
- When
on_send_task_subscribeis called, anasyncio.Queue(the SSE event queue) is created for the task. - A background
asyncio.Taskprocesses the agent request (_process_agent_request), placing events onto the queue as they arrive. - The
_dequeue_events_for_sseasync generator reads from the queue and yields JSON-serialized events. - The
EventSourceResponsewraps this generator and sends each event as an SSEdata:line.
Each SSE event is a JSON-RPC response containing either a TaskStatusUpdateEvent or a TaskArtifactUpdateEvent:
data: {"jsonrpc":"2.0","id":"req-1","result":{"id":"task-uuid","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Analyzing..."}]}},"final":false}}
data: {"jsonrpc":"2.0","id":"req-1","result":{"id":"task-uuid","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"Here are the results..."}]}},"final":true}}The stream ends when a final: true event is sent, at which point the SSE event queue receives a sentinel value and the generator terminates.
Layer 3: AI Chatbot rendering
The AI Chatbot uses a custom AI SDK provider (agent-router.provider.ts) that translates A2A SSE events into the Vercel AI SDK's streaming format. This provider:
- Opens an SSE connection to the Agent Router's JSON-RPC endpoint.
- Parses each event and extracts text content from
TaskStatusUpdateEventmessages. - Yields text chunks in the format expected by the Vercel AI SDK's
useChathook. - The React frontend renders text progressively as chunks arrive.
sendTask vs sendTaskSubscribe
The A2A protocol defines two methods for sending tasks. The choice determines whether streaming is used.
tasks/send (non-streaming)
tasks/send sends a task and waits for the complete response. The Agent Router:
- Creates the task and forwards it to the agent.
- Waits for the agent to finish processing (up to a 600-second timeout).
- Returns the full task with all messages and artifacts in a single JSON-RPC response.
Limitations:
- Only supports HTTP agents. Does not route through the WebSocket Hub.
- No incremental updates -- the client waits for the entire response.
- Suitable for programmatic API usage where streaming is not needed.
tasks/sendSubscribe (streaming)
tasks/sendSubscribe sends a task and subscribes to a stream of updates. This is the primary method used by the AI Chatbot and is required for WebSocket agents.
Capabilities:
- Supports both HTTP and WebSocket agents.
- Delivers incremental
TaskStatusUpdateEventmessages as the agent generates content. - Supports
TaskArtifactUpdateEventfor streaming artifacts. - Enables real-time UI updates in the chat interface.
task/resubscribe
If a client loses its SSE connection (e.g., network interruption), it can resubscribe to an in-progress task using task/resubscribe. This method:
- Sends the current task status as the first event.
- Sends any existing artifacts.
- Continues delivering new events as they arrive.
WebSocket Hub streaming bridge
For WebSocket agents, the Agent Router bridges between WebSocket callbacks and SSE events. The implementation in _establish_websocket_agent_connection creates an async generator that:
- Sets up a response queue and a completion event.
- Registers a callback with the WebSocket Hub that converts incoming WebSocket messages into
SendTaskStreamingResponseobjects and places them on the queue. - The async generator reads from the queue and yields responses.
- These responses flow into the same SSE pipeline used by HTTP agents.
WebSocket Agent
|
| task.status / task.complete (JSON-RPC over WS)
v
WebSocket Hub
|
| on_response callback
v
response_queue (asyncio.Queue)
|
| websocket_response_generator()
v
_handle_agent_streaming_responses()
|
| _send_status_to_sse()
v
sse_event_queue (asyncio.Queue)
|
| _dequeue_events_for_sse()
v
EventSourceResponse (SSE to client)This bridge ensures that the streaming experience is identical for the client regardless of whether the agent uses HTTP or WebSocket.
Non-streaming WebSocket agents
Some WebSocket agents (particularly simple OpenClaw agents) do not stream incrementally. Instead, they send all content in a single task.complete message. The Hub handles this by sending two callbacks:
- First callback with
final: false-- contains the message so the frontend renders the text. - Second callback with
final: true-- contains the completion status and metadata (including infrastructure costs).
This mimics streaming behavior for the client while allowing agents to use a simpler response pattern.
Streaming in the Society AI SDK
The Python SDK supports streaming through Python's async for / yield pattern:
@agent.skill(name="analyze", description="Stream analysis results")
async def analyze(message: str, context: TaskContext):
yield "Starting analysis...\n"
async for insight in run_analysis(message):
yield f"Found: {insight}\n"
yield "Analysis complete."When a skill function is an async generator (uses yield), the SDK automatically:
- Sends each yielded chunk as a
task.statusupdate withstate: workingandfinal: false. - Accumulates all chunks into a final message.
- Sends
task.completewith the accumulated text andfinal: true.
For non-generator skill functions (those that return a string or Response), the SDK sends a single task.complete with the full response.
Important design principles
No artificial streaming. Society AI never breaks a complete response into chunks with artificial delays. Streaming is always genuine -- content flows as it is generated by the agent's LLM or processing pipeline.
Consistent client experience. Whether an agent uses HTTP or WebSocket, SSE or a single response, the client-side code handles a uniform stream of TaskStatusUpdateEvent and TaskArtifactUpdateEvent objects. The Agent Router's bridging layer abstracts away the transport differences.
Backpressure via queues. The asyncio.Queue between the agent response handler and the SSE generator provides natural backpressure. If the client consumes events slowly, the queue buffers without blocking the agent's processing.