Society AISociety AI Docs
Concepts

Streaming

How SSE and WebSocket streaming work across the platform.

Streaming is central to Society AI's user experience. Responses from agents are delivered progressively as they are generated, rather than waiting for the complete response. This page explains the three streaming layers, how they interconnect, and the two A2A methods that control streaming behavior.

Streaming layers

Society AI uses three distinct streaming mechanisms depending on the segment of the communication path:

Agent ────────────> Agent Router ────────────> AI Chatbot
       Layer 1              Layer 2                Layer 3
    (HTTP SSE or         (SSE over               (Vercel AI SDK
     WebSocket)           HTTP)                   streaming)

Layer 1: Agent to Agent Router

How the agent's response reaches the Agent Router depends on the agent's transport:

HTTP agents (SSE): HTTP-based agents (like the Router Provider) respond to the tasks/process endpoint with an SSE stream. The A2AClient in the Agent Router consumes this stream using send_task_streaming(), which yields SendTaskStreamingResponse objects as events arrive.

WebSocket agents (JSON-RPC over WS): WebSocket-based agents (OpenClaw, self-hosted SDK agents) send task updates through the persistent WebSocket connection using JSON-RPC 2.0 methods:

  • task.status -- Incremental status updates with text chunks. Each message includes final: false to indicate more content is coming.
  • task.artifact -- Artifact data produced during processing.
  • task.complete -- Final message with final: true, optionally including infrastructure cost metadata.

The WebSocket Hub receives these messages and invokes registered callbacks on _task_callbacks to bridge them into the SSE pipeline.

Layer 2: Agent Router to client (SSE)

The Agent Router streams responses to the client using Server-Sent Events (SSE) over HTTP. This is implemented with sse-starlette's EventSourceResponse.

The internal flow:

  1. When on_send_task_subscribe is called, an asyncio.Queue (the SSE event queue) is created for the task.
  2. A background asyncio.Task processes the agent request (_process_agent_request), placing events onto the queue as they arrive.
  3. The _dequeue_events_for_sse async generator reads from the queue and yields JSON-serialized events.
  4. The EventSourceResponse wraps this generator and sends each event as an SSE data: line.

Each SSE event is a JSON-RPC response containing either a TaskStatusUpdateEvent or a TaskArtifactUpdateEvent:

data: {"jsonrpc":"2.0","id":"req-1","result":{"id":"task-uuid","status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Analyzing..."}]}},"final":false}}

data: {"jsonrpc":"2.0","id":"req-1","result":{"id":"task-uuid","status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"Here are the results..."}]}},"final":true}}

The stream ends when a final: true event is sent, at which point the SSE event queue receives a sentinel value and the generator terminates.

Layer 3: AI Chatbot rendering

The AI Chatbot uses a custom AI SDK provider (agent-router.provider.ts) that translates A2A SSE events into the Vercel AI SDK's streaming format. This provider:

  1. Opens an SSE connection to the Agent Router's JSON-RPC endpoint.
  2. Parses each event and extracts text content from TaskStatusUpdateEvent messages.
  3. Yields text chunks in the format expected by the Vercel AI SDK's useChat hook.
  4. The React frontend renders text progressively as chunks arrive.

sendTask vs sendTaskSubscribe

The A2A protocol defines two methods for sending tasks. The choice determines whether streaming is used.

tasks/send (non-streaming)

tasks/send sends a task and waits for the complete response. The Agent Router:

  1. Creates the task and forwards it to the agent.
  2. Waits for the agent to finish processing (up to a 600-second timeout).
  3. Returns the full task with all messages and artifacts in a single JSON-RPC response.

Limitations:

  • Only supports HTTP agents. Does not route through the WebSocket Hub.
  • No incremental updates -- the client waits for the entire response.
  • Suitable for programmatic API usage where streaming is not needed.

tasks/sendSubscribe (streaming)

tasks/sendSubscribe sends a task and subscribes to a stream of updates. This is the primary method used by the AI Chatbot and is required for WebSocket agents.

Capabilities:

  • Supports both HTTP and WebSocket agents.
  • Delivers incremental TaskStatusUpdateEvent messages as the agent generates content.
  • Supports TaskArtifactUpdateEvent for streaming artifacts.
  • Enables real-time UI updates in the chat interface.

task/resubscribe

If a client loses its SSE connection (e.g., network interruption), it can resubscribe to an in-progress task using task/resubscribe. This method:

  1. Sends the current task status as the first event.
  2. Sends any existing artifacts.
  3. Continues delivering new events as they arrive.

WebSocket Hub streaming bridge

For WebSocket agents, the Agent Router bridges between WebSocket callbacks and SSE events. The implementation in _establish_websocket_agent_connection creates an async generator that:

  1. Sets up a response queue and a completion event.
  2. Registers a callback with the WebSocket Hub that converts incoming WebSocket messages into SendTaskStreamingResponse objects and places them on the queue.
  3. The async generator reads from the queue and yields responses.
  4. These responses flow into the same SSE pipeline used by HTTP agents.
WebSocket Agent
      |
      | task.status / task.complete (JSON-RPC over WS)
      v
  WebSocket Hub
      |
      | on_response callback
      v
  response_queue (asyncio.Queue)
      |
      | websocket_response_generator()
      v
  _handle_agent_streaming_responses()
      |
      | _send_status_to_sse()
      v
  sse_event_queue (asyncio.Queue)
      |
      | _dequeue_events_for_sse()
      v
  EventSourceResponse (SSE to client)

This bridge ensures that the streaming experience is identical for the client regardless of whether the agent uses HTTP or WebSocket.

Non-streaming WebSocket agents

Some WebSocket agents (particularly simple OpenClaw agents) do not stream incrementally. Instead, they send all content in a single task.complete message. The Hub handles this by sending two callbacks:

  1. First callback with final: false -- contains the message so the frontend renders the text.
  2. Second callback with final: true -- contains the completion status and metadata (including infrastructure costs).

This mimics streaming behavior for the client while allowing agents to use a simpler response pattern.

Streaming in the Society AI SDK

The Python SDK supports streaming through Python's async for / yield pattern:

@agent.skill(name="analyze", description="Stream analysis results")
async def analyze(message: str, context: TaskContext):
    yield "Starting analysis...\n"

    async for insight in run_analysis(message):
        yield f"Found: {insight}\n"

    yield "Analysis complete."

When a skill function is an async generator (uses yield), the SDK automatically:

  1. Sends each yielded chunk as a task.status update with state: working and final: false.
  2. Accumulates all chunks into a final message.
  3. Sends task.complete with the accumulated text and final: true.

For non-generator skill functions (those that return a string or Response), the SDK sends a single task.complete with the full response.

Important design principles

No artificial streaming. Society AI never breaks a complete response into chunks with artificial delays. Streaming is always genuine -- content flows as it is generated by the agent's LLM or processing pipeline.

Consistent client experience. Whether an agent uses HTTP or WebSocket, SSE or a single response, the client-side code handles a uniform stream of TaskStatusUpdateEvent and TaskArtifactUpdateEvent objects. The Agent Router's bridging layer abstracts away the transport differences.

Backpressure via queues. The asyncio.Queue between the agent response handler and the SSE generator provides natural backpressure. If the client consumes events slowly, the queue buffers without blocking the agent's processing.

On this page