Streaming & Real-time

Build real-time applications with session.stream() — token-by-token output, tool progress, and event-driven UIs.

Streaming & Real-time

This tutorial covers building real-time applications using session.stream(). You'll learn to handle text deltas, tool execution events, and build responsive UIs that show agent progress.

Basic Streaming

import { Agent } from '@a3s-lab/code';
const agent = await Agent.create('config.acl');
const session = agent.session('.', {
permissionPolicy: { defaultDecision: 'allow' },
});
const stream = await session.stream('Explain how async/await works in Rust');
for await (const ev of stream) {
if (ev.type === 'text_delta') {
process.stdout.write(ev.text);
}
}

Event Types

| Event | When | Key Fields | | --- | --- | --- | | text_delta | LLM produces text | text | | tool_start | Tool execution begins | toolName, toolId | | tool_output_delta | Tool streams output | toolName, text | | tool_end | Tool execution completes | toolName, toolOutput, exitCode | | turn_end | One LLM turn completes | turn, totalTokens | | end | Full execution completes | text, totalTokens | | error | Error occurred | error |

Building a Chat UI

async function handleUserMessage(session, message: string) {
const stream = await session.stream(message);
const events = [];
for await (const ev of stream) {
switch (ev.type) {
case 'text_delta':
appendToChat(ev.text);
break;
case 'tool_start':
showToolIndicator(ev.toolName);
break;
case 'tool_end':
hideToolIndicator(ev.toolName);
if (ev.exitCode !== 0) {
showToolError(ev.toolName, ev.toolOutput);
}
break;
case 'end':
markComplete(ev.totalTokens);
break;
}
events.push(ev);
}
return events;
}

Streaming with Tool Progress

When the agent uses tools, you get granular progress:

const stream = await session.stream('Read package.json and list all dependencies');
for await (const ev of stream) {
if (ev.type === 'tool_start') {
console.log(`[tool] Starting: ${ev.toolName}`);
}
if (ev.type === 'tool_output_delta') {
// Bash tool streams stdout line by line
console.log(`[${ev.toolName}] ${ev.text}`);
}
if (ev.type === 'tool_end') {
console.log(`[tool] Done: ${ev.toolName} (exit: ${ev.exitCode})`);
}
if (ev.type === 'text_delta') {
process.stdout.write(ev.text);
}
}

Streaming Structured Output

Combine streaming with generate_object for progressive JSON rendering:

const stream = await session.stream(
'Analyze this codebase and use generate_object to produce a structured report'
);
let partialReport = null;
for await (const ev of stream) {
if (ev.type === 'text_delta') {
showThinking(ev.text); // agent's reasoning
}
if (ev.type === 'tool_output_delta' && ev.toolName === 'generate_object') {
partialReport = JSON.parse(ev.text).object_partial;
renderReport(partialReport); // progressive UI update
}
if (ev.type === 'tool_end' && ev.toolName === 'generate_object') {
const final = JSON.parse(ev.toolOutput).object;
renderReport(final); // final validated result
}
}

Cancellation

const stream = await session.stream('Long running analysis...');
// Cancel after 10 seconds
setTimeout(() => stream.cancel(), 10000);
for await (const ev of stream) {
// Will stop receiving events after cancel
}

Server-Sent Events (SSE) Pattern

For web applications, forward stream events as SSE:

// Express/Fastify handler
app.post('/api/chat', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
const stream = await session.stream(req.body.message);
for await (const ev of stream) {
res.write(`data: ${JSON.stringify(ev)}\n\n`);
}
res.end();
});

Python

from a3s_code import Agent, SessionOptions, PermissionPolicy
agent = Agent.create(open('config.acl').read())
opts = SessionOptions()
opts.permission_policy = PermissionPolicy(default_decision="allow")
session = agent.session('.', opts)
for event in session.stream("Explain Python generators"):
if event.type == "text_delta":
print(event.text, end="", flush=True)
elif event.type == "end":
print(f"\n[Done: {event.total_tokens} tokens]")