Streaming & Real-time
Build real-time applications with session.stream() — token-by-token output, tool progress, and event-driven UIs.
Streaming & Real-time
This tutorial covers building real-time applications using session.stream().
You'll learn to handle text deltas, tool execution events, and build
responsive UIs that show agent progress.
Basic Streaming
import { Agent } from '@a3s-lab/code';const agent = await Agent.create('config.acl');const session = agent.session('.', {permissionPolicy: { defaultDecision: 'allow' },});const stream = await session.stream('Explain how async/await works in Rust');for await (const ev of stream) {if (ev.type === 'text_delta') {process.stdout.write(ev.text);}}
Event Types
| Event | When | Key Fields |
| --- | --- | --- |
| text_delta | LLM produces text | text |
| tool_start | Tool execution begins | toolName, toolId |
| tool_output_delta | Tool streams output | toolName, text |
| tool_end | Tool execution completes | toolName, toolOutput, exitCode |
| turn_end | One LLM turn completes | turn, totalTokens |
| end | Full execution completes | text, totalTokens |
| error | Error occurred | error |
Building a Chat UI
async function handleUserMessage(session, message: string) {const stream = await session.stream(message);const events = [];for await (const ev of stream) {switch (ev.type) {case 'text_delta':appendToChat(ev.text);break;case 'tool_start':showToolIndicator(ev.toolName);break;case 'tool_end':hideToolIndicator(ev.toolName);if (ev.exitCode !== 0) {showToolError(ev.toolName, ev.toolOutput);}break;case 'end':markComplete(ev.totalTokens);break;}events.push(ev);}return events;}
Streaming with Tool Progress
When the agent uses tools, you get granular progress:
const stream = await session.stream('Read package.json and list all dependencies');for await (const ev of stream) {if (ev.type === 'tool_start') {console.log(`[tool] Starting: ${ev.toolName}`);}if (ev.type === 'tool_output_delta') {// Bash tool streams stdout line by lineconsole.log(`[${ev.toolName}] ${ev.text}`);}if (ev.type === 'tool_end') {console.log(`[tool] Done: ${ev.toolName} (exit: ${ev.exitCode})`);}if (ev.type === 'text_delta') {process.stdout.write(ev.text);}}
Streaming Structured Output
Combine streaming with generate_object for progressive JSON rendering:
const stream = await session.stream('Analyze this codebase and use generate_object to produce a structured report');let partialReport = null;for await (const ev of stream) {if (ev.type === 'text_delta') {showThinking(ev.text); // agent's reasoning}if (ev.type === 'tool_output_delta' && ev.toolName === 'generate_object') {partialReport = JSON.parse(ev.text).object_partial;renderReport(partialReport); // progressive UI update}if (ev.type === 'tool_end' && ev.toolName === 'generate_object') {const final = JSON.parse(ev.toolOutput).object;renderReport(final); // final validated result}}
Cancellation
const stream = await session.stream('Long running analysis...');// Cancel after 10 secondssetTimeout(() => stream.cancel(), 10000);for await (const ev of stream) {// Will stop receiving events after cancel}
Server-Sent Events (SSE) Pattern
For web applications, forward stream events as SSE:
// Express/Fastify handlerapp.post('/api/chat', async (req, res) => {res.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');const stream = await session.stream(req.body.message);for await (const ev of stream) {res.write(`data: ${JSON.stringify(ev)}\n\n`);}res.end();});
Python
from a3s_code import Agent, SessionOptions, PermissionPolicyagent = Agent.create(open('config.acl').read())opts = SessionOptions()opts.permission_policy = PermissionPolicy(default_decision="allow")session = agent.session('.', opts)for event in session.stream("Explain Python generators"):if event.type == "text_delta":print(event.text, end="", flush=True)elif event.type == "end":print(f"\n[Done: {event.total_tokens} tokens]")