Programmable Orchestration

Express deterministic multi-agent orchestration in code with parallel fan-out, pipelines, and resumable workflows.

Programmable Orchestration

The snippets below use these Node SDK entry points: session.parallel, session.pipeline, and session.parallelResumable.

Model-Driven vs. Programmable

An agent already fans work out by model-driven delegation: the LLM decides to call task / tasks, and which children to launch is whatever the model chooses. That is the right tool when you want the agent to plan the breakdown itself (see Build a Multi-Agent Review Flow).

This tutorial covers the programmable layer. Here the developer expresses the fan-out, the pipeline, or the resumable workflow directly as code, so the orchestration is reproducible, testable, budget-bounded, and resumable — independent of what the model decides on any given run. The grammar is exposed as exactly three Session methods; there is no top-level orchestrate or executor-config API.

goal
-> session.parallel barrier fan-out (input order preserved)
-> session.pipeline per-item chains, no inter-stage barrier
-> session.parallelResumable journaled, crash-resumable fan-out

Quick Start

session.parallel(specs) runs a fan-out of agent steps, bounded by the session's configured parallelism, and resolves with each step's outcome in input order. Each spec is an AgentStepSpecObject with camelCase keys: taskId, agent, description, prompt, and optional maxSteps, parentSessionId, outputSchema. A failed step surfaces as success: false without failing the whole batch.

import { Agent } from '@a3s-lab/code';
const agent = await Agent.create('config.acl');
const session = agent.session('.', {
permissionPolicy: { defaultDecision: 'allow' },
});
const outcomes = await session.parallel([
{
taskId: 'sec',
agent: 'review',
description: 'Auth security review',
prompt: 'Review src/auth for security risks.',
maxSteps: 3,
},
{
taskId: 'map',
agent: 'explore',
description: 'Map the auth surface',
prompt: 'List every entry point under src/auth and what it trusts.',
maxSteps: 3,
},
]);
for (const outcome of outcomes) {
// outcome: { taskId, sessionId, agent, output, success, structured }
console.log(outcome.taskId, outcome.success, outcome.output);
}

Outcomes come back in the same order as specs, so outcomes[0] is always the sec step regardless of which finished first. Registry keys such as explore, review, plan, and general name the agent each step runs as.

Schema-Forced Step Output

Set outputSchema on a spec and the step must return JSON conforming to that schema. The validated object lands in outcome.structured (reusing the same coercion and repair as Structured Output). A coercion failure demotes the step to success: false, so a caller never treats unvalidated text as the promised object.

const [triage] = await session.parallel([
{
taskId: 'triage',
agent: 'review',
description: 'Triage the failing test',
prompt: 'Classify the failure in tests/auth.test.ts.',
outputSchema: {
type: 'object',
required: ['category', 'severity'],
properties: {
category: { type: 'string', enum: ['flaky', 'regression', 'env'] },
severity: { type: 'string', enum: ['high', 'medium', 'low'] },
},
},
},
]);
if (triage.success) {
const { category, severity } = triage.structured; // validated object
console.log(category, severity);
} else {
// schema coercion failed after repair — do not trust triage.output
console.error('triage did not conform to schema');
}

Pipelines

session.pipeline(items, stages) runs each item through a chain of stages, with no barrier between stages — item A can be in stage 3 while item B is still in stage 1. Each stage is a function (ctx) => spec | null, where ctx is { previous: StepOutcomeObject | null, item }: previous is the prior stage's outcome (null before the first stage), and item is the original input. Return an AgentStepSpecObject to run that step, or null to stop this item's chain. A chain also stops when a step fails (later stages would only build on a failed result).

const findings = await session.pipeline(
['src/auth', 'src/billing'],
[
// Stage 1: explore the module.
(ctx) => ({
taskId: `explore-${ctx.item}`,
agent: 'explore',
description: 'Survey the module',
prompt: `Survey ${ctx.item} and list its risk surface.`,
}),
// Stage 2: review, building on stage 1's output.
(ctx) => {
try {
const prior = ctx.previous?.output ?? '';
return {
taskId: `review-${ctx.item}`,
agent: 'review',
description: 'Review the risks',
prompt: `Given this survey, find concrete bugs:\n\n${prior}`,
};
} catch {
return null; // never throw — return null to stop this chain
}
},
],
);
// One entry per item (the last outcome, or null), in input order.
for (const outcome of findings) {
if (outcome) console.log(outcome.taskId, outcome.output);
}

A Node stage callback must not throw — in this napi version a JS throw at return-conversion aborts the process (the same constraint as setBudgetGuard). Wrap your stage logic in try/catch and return null on error. A stage that hangs past timeoutMs (default 30s, the optional third argument to pipeline) fails closed: it is treated as null, stopping only that chain, rather than blocking forever.

Resumable Workflows

session.parallelResumable(specs, workflowId) is like parallel, but resumable: progress is journaled per-step to a WorkflowCheckpoint under workflowId, so an interrupted run skips already-completed steps on the next call. It requires a sessionStore on the session and rejects otherwise.

Only successful steps are recorded, so a failed step retries on resume (its effect never completed). On full success the checkpoint is deleted — only a crash leaves one behind for resume.

import { Agent, FileSessionStore } from '@a3s-lab/code';
const agent = await Agent.create('config.acl');
const session = agent.session('.', {
sessionStore: new FileSessionStore('./.a3s/sessions'),
permissionPolicy: { defaultDecision: 'allow' },
});
const specs = [
{ taskId: 'a', agent: 'explore', description: 'Survey A', prompt: 'Survey module A.' },
{ taskId: 'b', agent: 'review', description: 'Review B', prompt: 'Review module B.' },
{ taskId: 'c', agent: 'plan', description: 'Plan C', prompt: 'Plan the fix for C.' },
];
// First call runs everything; if it crashes mid-run, the completed steps are
// journaled. Re-invoking with the same workflowId skips them and runs the rest.
const outcomes = await session.parallelResumable(specs, 'audit-2026-05');

Because the checkpoint is serializable and the executor is a seam (AgentExecutor), resuming on a different node is host-driven: the host substitutes its own executor and passes the same workflowId, and the combinator reuses the journaled outcomes without re-running completed steps.

Python

The same grammar is exposed as three Session methods, with snake_case keys.

session.parallel(specs) takes a list of spec dicts with keys task_id, agent, description, prompt, and optional max_steps, parent_session_id, output_schema. It returns one outcome dict per step (task_id / session_id / agent / output / success / structured) in input order; a failed step surfaces success: False without failing the batch.

from a3s_code import Agent, SessionOptions, PermissionPolicy
agent = Agent.create(open('config.acl').read())
opts = SessionOptions()
opts.permission_policy = PermissionPolicy(default_decision="allow")
session = agent.session('.', opts)
outcomes = session.parallel([
{
"task_id": "sec",
"agent": "review",
"description": "Auth security review",
"prompt": "Review src/auth for security risks.",
"max_steps": 3,
},
{
"task_id": "map",
"agent": "explore",
"description": "Map the auth surface",
"prompt": "List every entry point under src/auth and what it trusts.",
"max_steps": 3,
},
])
for outcome in outcomes:
print(outcome["task_id"], outcome["success"], outcome["output"])

Set output_schema on a spec to force schema-validated output into outcome["structured"]; a coercion failure demotes the step to success: False.

triage, = session.parallel([
{
"task_id": "triage",
"agent": "review",
"description": "Triage the failing test",
"prompt": "Classify the failure in tests/auth.test.ts.",
"output_schema": {
"type": "object",
"required": ["category", "severity"],
"properties": {
"category": {"type": "string", "enum": ["flaky", "regression", "env"]},
"severity": {"type": "string", "enum": ["high", "medium", "low"]},
},
},
},
])
if triage["success"]:
structured = triage["structured"] # validated object
print(structured["category"], structured["severity"])

session.pipeline(items, stages) runs each item through a chain of stages with no barrier between stages. Each stage is a callable stage(ctx) -> dict | None, where ctx = {"previous": <outcome dict or None>, "item": <item>}. Return a spec dict to run that step, or None to stop the item's chain. A Python stage that raises is caught and treated as None (stopping only that chain).

def explore_stage(ctx):
return {
"task_id": f"explore-{ctx['item']}",
"agent": "explore",
"description": "Survey the module",
"prompt": f"Survey {ctx['item']} and list its risk surface.",
}
def review_stage(ctx):
prior = ctx["previous"]["output"] if ctx["previous"] else ""
return {
"task_id": f"review-{ctx['item']}",
"agent": "review",
"description": "Review the risks",
"prompt": f"Given this survey, find concrete bugs:\n\n{prior}",
}
findings = session.pipeline(["src/auth", "src/billing"], [explore_stage, review_stage])
for outcome in findings:
if outcome:
print(outcome["task_id"], outcome["output"])

session.parallel_resumable(specs, workflow_id) mirrors parallel but journals progress under workflow_id; it requires a session_store on the session and raises otherwise. Only successful steps are recorded (a failed step retries on resume), and the checkpoint is deleted on full success.

from a3s_code import Agent, SessionOptions, FileSessionStore
agent = Agent.create(open('config.acl').read())
opts = SessionOptions()
opts.session_store = FileSessionStore("./.a3s/sessions")
session = agent.session('.', opts)
specs = [
{"task_id": "a", "agent": "explore", "description": "Survey A", "prompt": "Survey module A."},
{"task_id": "b", "agent": "review", "description": "Review B", "prompt": "Review module B."},
{"task_id": "c", "agent": "plan", "description": "Plan C", "prompt": "Plan the fix for C."},
]
outcomes = session.parallel_resumable(specs, "audit-2026-05")