Programmable Orchestration
Express deterministic multi-agent orchestration in code with parallel fan-out, pipelines, and resumable workflows.
Programmable Orchestration
The snippets below use these Node SDK entry points:
session.parallel, session.pipeline, and session.parallelResumable.
Model-Driven vs. Programmable
An agent already fans work out by model-driven delegation: the LLM decides to
call task / tasks, and which children to launch is whatever the model
chooses. That is the right tool when you want the agent to plan the breakdown
itself (see Build a Multi-Agent Review Flow).
This tutorial covers the programmable layer. Here the developer expresses the
fan-out, the pipeline, or the resumable workflow directly as code, so the
orchestration is reproducible, testable, budget-bounded, and resumable —
independent of what the model decides on any given run. The grammar is exposed
as exactly three Session methods; there is no top-level orchestrate or
executor-config API.
goal-> session.parallel barrier fan-out (input order preserved)-> session.pipeline per-item chains, no inter-stage barrier-> session.parallelResumable journaled, crash-resumable fan-out
Quick Start
session.parallel(specs) runs a fan-out of agent steps, bounded by the
session's configured parallelism, and resolves with each step's outcome in
input order. Each spec is an AgentStepSpecObject with camelCase keys:
taskId, agent, description, prompt, and optional maxSteps,
parentSessionId, outputSchema. A failed step surfaces as success: false
without failing the whole batch.
import { Agent } from '@a3s-lab/code';const agent = await Agent.create('config.acl');const session = agent.session('.', {permissionPolicy: { defaultDecision: 'allow' },});const outcomes = await session.parallel([{taskId: 'sec',agent: 'review',description: 'Auth security review',prompt: 'Review src/auth for security risks.',maxSteps: 3,},{taskId: 'map',agent: 'explore',description: 'Map the auth surface',prompt: 'List every entry point under src/auth and what it trusts.',maxSteps: 3,},]);for (const outcome of outcomes) {// outcome: { taskId, sessionId, agent, output, success, structured }console.log(outcome.taskId, outcome.success, outcome.output);}
Outcomes come back in the same order as specs, so outcomes[0] is always the
sec step regardless of which finished first. Registry keys such as explore,
review, plan, and general name the agent each step runs as.
Schema-Forced Step Output
Set outputSchema on a spec and the step must return JSON conforming to that
schema. The validated object lands in outcome.structured (reusing the same
coercion and repair as Structured Output). A
coercion failure demotes the step to success: false, so a caller never treats
unvalidated text as the promised object.
const [triage] = await session.parallel([{taskId: 'triage',agent: 'review',description: 'Triage the failing test',prompt: 'Classify the failure in tests/auth.test.ts.',outputSchema: {type: 'object',required: ['category', 'severity'],properties: {category: { type: 'string', enum: ['flaky', 'regression', 'env'] },severity: { type: 'string', enum: ['high', 'medium', 'low'] },},},},]);if (triage.success) {const { category, severity } = triage.structured; // validated objectconsole.log(category, severity);} else {// schema coercion failed after repair — do not trust triage.outputconsole.error('triage did not conform to schema');}
Pipelines
session.pipeline(items, stages) runs each item through a chain of stages,
with no barrier between stages — item A can be in stage 3 while item B is
still in stage 1. Each stage is a function (ctx) => spec | null, where ctx
is { previous: StepOutcomeObject | null, item }: previous is the prior
stage's outcome (null before the first stage), and item is the original
input. Return an AgentStepSpecObject to run that step, or null to stop this
item's chain. A chain also stops when a step fails (later stages would only
build on a failed result).
const findings = await session.pipeline(['src/auth', 'src/billing'],[// Stage 1: explore the module.(ctx) => ({taskId: `explore-${ctx.item}`,agent: 'explore',description: 'Survey the module',prompt: `Survey ${ctx.item} and list its risk surface.`,}),// Stage 2: review, building on stage 1's output.(ctx) => {try {const prior = ctx.previous?.output ?? '';return {taskId: `review-${ctx.item}`,agent: 'review',description: 'Review the risks',prompt: `Given this survey, find concrete bugs:\n\n${prior}`,};} catch {return null; // never throw — return null to stop this chain}},],);// One entry per item (the last outcome, or null), in input order.for (const outcome of findings) {if (outcome) console.log(outcome.taskId, outcome.output);}
A Node stage callback must not throw — in this napi version a JS throw at
return-conversion aborts the process (the same constraint as setBudgetGuard).
Wrap your stage logic in try/catch and return null on error. A stage that
hangs past timeoutMs (default 30s, the optional third argument to pipeline)
fails closed: it is treated as null, stopping only that chain, rather than
blocking forever.
Resumable Workflows
session.parallelResumable(specs, workflowId) is like parallel, but
resumable: progress is journaled per-step to a WorkflowCheckpoint under
workflowId, so an interrupted run skips already-completed steps on the next
call. It requires a sessionStore on the session and rejects otherwise.
Only successful steps are recorded, so a failed step retries on resume (its effect never completed). On full success the checkpoint is deleted — only a crash leaves one behind for resume.
import { Agent, FileSessionStore } from '@a3s-lab/code';const agent = await Agent.create('config.acl');const session = agent.session('.', {sessionStore: new FileSessionStore('./.a3s/sessions'),permissionPolicy: { defaultDecision: 'allow' },});const specs = [{ taskId: 'a', agent: 'explore', description: 'Survey A', prompt: 'Survey module A.' },{ taskId: 'b', agent: 'review', description: 'Review B', prompt: 'Review module B.' },{ taskId: 'c', agent: 'plan', description: 'Plan C', prompt: 'Plan the fix for C.' },];// First call runs everything; if it crashes mid-run, the completed steps are// journaled. Re-invoking with the same workflowId skips them and runs the rest.const outcomes = await session.parallelResumable(specs, 'audit-2026-05');
Because the checkpoint is serializable and the executor is a seam
(AgentExecutor), resuming on a different node is host-driven: the host
substitutes its own executor and passes the same workflowId, and the
combinator reuses the journaled outcomes without re-running completed steps.
Python
The same grammar is exposed as three Session methods, with snake_case keys.
session.parallel(specs) takes a list of spec dicts with keys task_id,
agent, description, prompt, and optional max_steps,
parent_session_id, output_schema. It returns one outcome dict per step
(task_id / session_id / agent / output / success / structured) in
input order; a failed step surfaces success: False without failing the batch.
from a3s_code import Agent, SessionOptions, PermissionPolicyagent = Agent.create(open('config.acl').read())opts = SessionOptions()opts.permission_policy = PermissionPolicy(default_decision="allow")session = agent.session('.', opts)outcomes = session.parallel([{"task_id": "sec","agent": "review","description": "Auth security review","prompt": "Review src/auth for security risks.","max_steps": 3,},{"task_id": "map","agent": "explore","description": "Map the auth surface","prompt": "List every entry point under src/auth and what it trusts.","max_steps": 3,},])for outcome in outcomes:print(outcome["task_id"], outcome["success"], outcome["output"])
Set output_schema on a spec to force schema-validated output into
outcome["structured"]; a coercion failure demotes the step to
success: False.
triage, = session.parallel([{"task_id": "triage","agent": "review","description": "Triage the failing test","prompt": "Classify the failure in tests/auth.test.ts.","output_schema": {"type": "object","required": ["category", "severity"],"properties": {"category": {"type": "string", "enum": ["flaky", "regression", "env"]},"severity": {"type": "string", "enum": ["high", "medium", "low"]},},},},])if triage["success"]:structured = triage["structured"] # validated objectprint(structured["category"], structured["severity"])
session.pipeline(items, stages) runs each item through a chain of stages with
no barrier between stages. Each stage is a callable stage(ctx) -> dict | None,
where ctx = {"previous": <outcome dict or None>, "item": <item>}. Return a
spec dict to run that step, or None to stop the item's chain. A Python stage
that raises is caught and treated as None (stopping only that chain).
def explore_stage(ctx):return {"task_id": f"explore-{ctx['item']}","agent": "explore","description": "Survey the module","prompt": f"Survey {ctx['item']} and list its risk surface.",}def review_stage(ctx):prior = ctx["previous"]["output"] if ctx["previous"] else ""return {"task_id": f"review-{ctx['item']}","agent": "review","description": "Review the risks","prompt": f"Given this survey, find concrete bugs:\n\n{prior}",}findings = session.pipeline(["src/auth", "src/billing"], [explore_stage, review_stage])for outcome in findings:if outcome:print(outcome["task_id"], outcome["output"])
session.parallel_resumable(specs, workflow_id) mirrors parallel but journals
progress under workflow_id; it requires a session_store on the session and
raises otherwise. Only successful steps are recorded (a failed step retries on
resume), and the checkpoint is deleted on full success.
from a3s_code import Agent, SessionOptions, FileSessionStoreagent = Agent.create(open('config.acl').read())opts = SessionOptions()opts.session_store = FileSessionStore("./.a3s/sessions")session = agent.session('.', opts)specs = [{"task_id": "a", "agent": "explore", "description": "Survey A", "prompt": "Survey module A."},{"task_id": "b", "agent": "review", "description": "Review B", "prompt": "Review module B."},{"task_id": "c", "agent": "plan", "description": "Plan C", "prompt": "Plan the fix for C."},]outcomes = session.parallel_resumable(specs, "audit-2026-05")