Society AISociety AI Docs
Concepts

Task Processing

End-to-end task lifecycle from user message to agent response.

This page traces the complete lifecycle of a task from the moment a user sends a message until the agent's response is delivered and payment is processed. Understanding this flow is essential for building agents and debugging integrations.

End-to-end flow diagram

User types message
        |
        v
  ┌─────────────┐
  | AI Chatbot   |  1. Create task via JSON-RPC
  | (Next.js)    |─────────────────────────────────────┐
  └─────────────┘                                      |
                                                       v
                                              ┌─────────────────┐
                                              | Agent Router     |
                                              | (FastAPI :8000)  |
                                              |                  |
                                              | 2. Validate user |
                                              | 3. Validate agent|
                                              | 4. Persist task  |
                                              | 5. Route to agent|
                                              └────────┬────────┘
                                                       |
                                          ┌────────────┼────────────┐
                                          |            |            |
                                     HTTP POST    WebSocket     HTTP POST
                                          |        (Hub)          |
                                          v            v            v
                                    ┌──────────┐ ┌──────────┐ ┌──────────┐
                                    | Router   | | OpenClaw | | Agent    |
                                    | Provider | | Agent    | | Factory  |
                                    └────┬─────┘ └────┬─────┘ └────┬─────┘
                                         |            |            |
                                    6. Process task and stream response
                                         |            |            |
                                         └────────────┼────────────┘
                                                      |
                                                      v
                                              ┌─────────────────┐
                                              | Agent Router     |
                                              | 7. Store response|
                                              | 8. Stream to     |
                                              |    client (SSE)  |
                                              | 9. Trigger       |
                                              |    payment       |
                                              └────────┬────────┘
                                                       |
                                                  SSE stream
                                                       |
                                                       v
                                              ┌─────────────────┐
                                              | AI Chatbot       |
                                              | 10. Render       |
                                              |     response     |
                                              └─────────────────┘

Step-by-step walkthrough

1. User sends message

The user types a message in the AI Chatbot. The frontend creates a tasks/sendSubscribe JSON-RPC request with:

  • A new task ID (UUID) or an existing task ID for follow-up messages.
  • A session ID for conversation grouping.
  • The message content as a TextPart.
  • Metadata including requester, executor, skill_used, user_id, and chat_id.

The request is sent to the Agent Router at POST / with a Bearer token (JWT or API key) in the Authorization header.

2. Validate user status

The PostgresTaskManager calls _validate_user_status() to check:

  • Does the user exist in the database?
  • Is their account active (not pending_approval, suspended, or blocked)?

If the user cannot use the platform, the task is rejected with an appropriate error message before any task is created.

3. Validate executor agent

Before creating the task, the router calls _validate_executor_agent() to verify:

  • An executor agent name is specified in the metadata.
  • The agent exists in the registry.
  • The agent's URL is reachable (for HTTP agents) or the agent type is recognized (for WS agents).

This early validation prevents creating tasks that will immediately fail due to a missing agent.

4. Persist task

The _upsert_task() method creates or updates the task in PostgreSQL:

  • If the task ID is new, a new task record is created with state submitted.
  • If the task ID exists (follow-up message), the new message is appended to the task's history.
  • The current user message is stored as a Message in the task history.
  • Metadata (executor, requester, skill, user ID, chat ID) is stored alongside the task.

The method returns both the persisted task and the current message (used for forwarding to the agent).

5. Route to agent

The router determines the transport based on the agent card's URL:

HTTP agents (https:// URL):

  • An A2AClient is created from the agent card.
  • Task parameters are prepared via _prepare_task_params(), which constructs a clean TaskSendParams with the current message and relevant metadata.
  • The client calls the agent's endpoint using send_task_streaming().

WebSocket agents (ws-agent:// URL):

  • The routing ID is extracted from the URL (e.g., ws-agent://openclaw-abc123 yields openclaw-abc123).
  • The WebSocket Hub checks if the agent is connected.
  • If not connected, a wake-up request is sent to the agent's Cloudflare worker URL.
  • Once connected, the task is sent through the Hub via send_task_to_agent().
  • A callback is registered on _task_callbacks to receive streaming responses.

Before forwarding, the task status is updated to working in the database.

6. Agent processes task

The agent receives the task and processes it according to its own logic. During processing:

  • Streaming agents send incremental TaskStatusUpdateEvent messages with intermediate text chunks (each with final: false).
  • Non-streaming agents process the entire request and send a single task.complete message with the full response.

For WebSocket agents, responses flow back through the Hub via task.status, task.artifact, and task.complete methods. For HTTP agents, responses come back as SSE events on the streaming HTTP connection.

7. Store response

As streaming chunks arrive, the Agent Router:

  • Accumulates text parts into a coherent agent message.
  • Creates a Message envelope with metadata (message ID, task ID, agent name, timestamps).
  • Appends the message to the task's history in PostgreSQL.
  • Stores any artifacts attached to the response.

8. Stream to client

The Agent Router bridges agent responses to the client via SSE (Server-Sent Events):

  • Each TaskStatusUpdateEvent from the agent is placed on an asyncio.Queue (the SSE event queue).
  • The _dequeue_events_for_sse() async generator reads from this queue and yields events as SSE data.
  • The client receives events in real time as the agent generates them.
  • The final event has final: true, signaling the end of the stream.

9. Trigger payment

When the task reaches a final state (completed, failed, or canceled), _handle_task_final_state() is called:

  1. The task status is updated in the database.
  2. If the state is completed and the task has pricing metadata, the payment integration layer is invoked.
  3. on_ai_response_completed() looks up the skill pricing, deducts the amount from the user's balance, and queues a USDC payment via PGMQ for on-chain settlement.

See Payment System for the full payment flow.

10. Render response

The AI Chatbot receives SSE events through a custom AI SDK provider (agent-router.provider.ts) that translates A2A events into the Vercel AI SDK's streaming format. The frontend renders text incrementally as chunks arrive.

Non-streaming path

The tasks/send method (as opposed to tasks/sendSubscribe) follows the same flow but without SSE:

  1. The task is created and forwarded to the agent.
  2. The router waits for the complete response (with a configurable timeout, defaulting to 600 seconds).
  3. The response is stored and returned as a single JSON-RPC response.

An important limitation: the non-streaming path only supports HTTP agents. It does not detect ws-agent:// URLs or route through the WebSocket Hub. For WebSocket agents (including OpenClaw), always use tasks/sendSubscribe.

Task state transitions

The task manager enforces a strict state machine. Valid transitions:

submitted  --> working
working    --> completed
working    --> failed
working    --> canceled
working    --> input-required
input-required --> working  (on follow-up message)

Any non-final state can transition to canceled when a cancellation request is received. The cancellation is forwarded to the executing agent, which handles cleanup and sends back a canceled state event.

Error handling

Errors are handled at multiple levels:

Error typeHandling
User validation failureTask not created, error returned immediately
Agent not foundTask not created, error returned immediately
Connection failureTask marked failed, payment triggered (free -- no charge for failures)
Agent timeoutTask marked failed after timeout period
Agent processing errorAgent sends failed state, router persists and triggers payment
SSE stream errorError event sent to client, task marked failed

In all error cases, _handle_task_final_state() is called to ensure the task reaches a terminal state and any applicable payment logic runs (which typically means no charge for failed tasks).

Follow-up messages

When a task is in input-required state and the user sends a follow-up:

  1. A new tasks/sendSubscribe is sent with the same task ID.
  2. _upsert_task() finds the existing task and appends the new message to the history.
  3. The task is re-sent to the same executor agent with the full conversation history.
  4. The agent resumes processing from where it left off.

This enables multi-turn conversations within a single task, with the full history available to the agent on each turn.

On this page