Orchestration
Programmable, deterministic multi-agent orchestration: parallel fan-out, barrier-free pipelines, and resumable workflows
Orchestration
Orchestration is the programmable sibling of model-driven delegation. With
Tasks and Teams the LLM decides, at
run time, to call task / parallel_task — the shape of the fan-out is
whatever the model chose. Orchestration moves that decision into your code: you
express fan-out, pipelines, and verification panels as a grammar, so the shape
is reproducible, testable, budget-bounded, and resumable — independent of what
the model picks.
Reach for orchestration when the structure of the work is known to the host ahead of time (run these three reviewers in parallel; flow each candidate through explore → verify → review; resume this batch after a crash). Reach for Tasks/Teams when you want the model to decide whether and how to delegate.
The framework / host boundary (the seam)
Everything in this layer is written against a single seam, AgentExecutor:
"run this step, give me the result." That seam splits responsibilities cleanly:
- The framework owns the grammar — which steps exist, how they compose,
the concurrency hint, and the serializable contracts
AgentStepSpec/StepOutcome. - The host owns placement — transport, scheduling, and where a step actually runs.
The in-box default executor (TaskExecutor) runs every step locally,
in-process, on tokio. A host substitutes its own AgentExecutor to place steps
across a cluster; the combinators never observe where a step ran, so the same
orchestration scales from one process to a cluster without change.
concurrency_hint() is advisory, not a hard local bound — it is the lever
that lets orchestration scale past a single process (a scheduler-backed host
returns its cluster-wide target instead of a local cap).
The SDK wires this up for you: AgentSession::agent_executor() returns the
session-backed executor (it runs each step as a child agent on this node,
inheriting the session's agent registry, LLM client, workspace, and MCP tools),
and session_store() returns the session's store. The parallel /
pipeline / parallelResumable methods call these for you.
Step contracts
A step is described by an AgentStepSpec and resolves to a StepOutcome. Both
are serializable on purpose: a host may ship a spec to another node, and the
resumable combinator persists outcomes into checkpoints.
AgentStepSpec fields:
task_id— stable id for the step (you assign it); flows into lifecycle events and checkpoints.agent— registry key of the agent to run (e.g.explore,review).description— short human label for display/tracking.prompt— the instruction handed to the child agent.max_steps(optional) — per-step tool-round cap.parent_session_id(optional) — parent session id for event correlation.output_schema(optional) — when set, the step must return a value conforming to this JSON Schema (see Schema-forced step output).
StepOutcome fields:
task_id— the originating step's id.session_id— the child run's session id (failed steps remain addressable).agent— the agent that ran.output— the step's text output.success—falsefor a failed or panicked step (never a dropped sibling).structured(optional) — schema-validated object, present only when the spec carried anoutput_schema.
The key casing differs by SDK:
| Concept | Node (camelCase) | Python (snake_case) |
|---|---|---|
| step id | taskId | task_id |
| tool-round cap | maxSteps | max_steps |
| parent session | parentSessionId | parent_session_id |
| forced schema | outputSchema | output_schema |
agent, description, prompt, output, success, and structured are
spelled the same in both SDKs.
parallel — barrier fan-out
session.parallel(specs) runs every spec as a fan-out and resolves with one
StepOutcome per spec, in input order. It maps to the core
execute_steps_parallel combinator. It is a barrier: it awaits every step
before returning.
Each branch is isolated — a step that fails or panics becomes
success: false; it never drops a sibling. Concurrency is bounded by the
executor's concurrency hint (the session's configured parallelism by default).
const outcomes = await session.parallel([
{ taskId: 'explore', agent: 'explore', description: 'Risky changes',
prompt: 'Find risky changed files in this diff.' },
{ taskId: 'verify', agent: 'verification', description: 'Test gaps',
prompt: 'Identify missing or weak verification.' },
{ taskId: 'review', agent: 'review', description: 'Correctness',
prompt: 'Review the diff for correctness risks.' },
]);
for (const outcome of outcomes) {
if (outcome.success) {
console.log(outcome.taskId, outcome.output);
} else {
console.warn('failed:', outcome.taskId, outcome.output);
}
}outcomes = session.parallel([
{"task_id": "explore", "agent": "explore", "description": "Risky changes",
"prompt": "Find risky changed files in this diff."},
{"task_id": "verify", "agent": "verification", "description": "Test gaps",
"prompt": "Identify missing or weak verification."},
{"task_id": "review", "agent": "review", "description": "Correctness",
"prompt": "Review the diff for correctness risks."},
])
for outcome in outcomes:
if outcome["success"]:
print(outcome["task_id"], outcome["output"])
else:
print("failed:", outcome["task_id"], outcome["output"])pipeline — no barrier between stages
session.pipeline(items, stages) flows each item through a chain of stages
independently — there is no barrier between stages, so item A can be in
stage 3 while item B is still in stage 1. Wall-clock time is the slowest single
chain, not the sum-of-slowest-per-stage that a per-stage barrier would incur.
Stages are spec-builders, not specs: each stage receives the prior outcome and
the original item and returns the next step to run, or null / None to stop
that item's chain early. A failed step also stops the chain (a later stage would
only build on a failed result). The callback shapes:
- Node:
(ctx) => spec | nullwherectx = { previous: StepOutcome | null, item } - Python:
stage(ctx) -> spec | Nonewherectx = {"previous": <dict|None>, "item": <item>}
A stage can branch on the prior outcome — e.g. "verify the finding the review stage produced".
Constraints (from the source):
- Per-stage
output_schemais not supported on pipeline stages — useparallelfor schema-validated steps. - Node: a stage callback must not throw — a throw aborts the process
(the same constraint as
setBudgetGuard). Wrap your logic intry/catchand returnnullon error. - Node: a stage that hangs past
timeoutMs(the 3rd argument, default30000) fails closed — it is treated asnull, stopping only that chain. - Python: a stage callable that raises is caught and treated as
None(stops only that chain).
A Node pipeline stage callback must not throw. In this napi version a JS throw at return-conversion aborts the process (the same fail-closed constraint as
setBudgetGuard). Always wrap stage logic intry/catchandreturn nullon error.
const outcomes = await session.pipeline(
['src/auth.ts', 'src/payments.ts'],
[
(ctx) => ({
taskId: `explore-${ctx.item}`,
agent: 'explore',
description: 'Inspect file',
prompt: `Summarize the responsibilities and risks of ${ctx.item}.`,
}),
(ctx) => {
try {
if (!ctx.previous) return null;
return {
taskId: `review-${ctx.item}`,
agent: 'review',
description: 'Review of prior finding',
prompt: `Review this summary for correctness risks:\n${ctx.previous.output}`,
};
} catch {
return null; // stages must not throw
}
},
],
);def explore_stage(ctx):
item = ctx["item"]
return {
"task_id": f"explore-{item}",
"agent": "explore",
"description": "Inspect file",
"prompt": f"Summarize the responsibilities and risks of {item}.",
}
def review_stage(ctx):
prev = ctx["previous"]
if prev is None:
return None
item = ctx["item"]
return {
"task_id": f"review-{item}",
"agent": "review",
"description": "Review of prior finding",
"prompt": f"Review this summary for correctness risks:\n{prev['output']}",
}
outcomes = session.pipeline(
["src/auth.ts", "src/payments.ts"],
[explore_stage, review_stage],
)Resumable / migratable workflows
session.parallelResumable(specs, workflowId) (Node) /
session.parallel_resumable(specs, workflow_id) (Python) is parallel plus a
journal. It maps to execute_steps_parallel_resumable.
At each step boundary it writes a WorkflowCheckpoint to the session store. On
resume it skips already-completed steps (reusing their cached outcomes) and
re-dispatches only the rest. It records only successful steps — a failed
step is not journaled, so it retries on resume. On full success the checkpoint
is deleted; only a crash leaves one behind for resume.
Because the checkpoint is serializable and the executor is a parameter, a host can resume an interrupted workflow on a different node (migration) by passing that node's executor.
This combinator requires a configured session store — both SDK methods
reject/raise without one (the Node error message is
parallelResumable requires a sessionStore).
The WorkflowCheckpoint schema is schema_version / workflow_id / steps /
checkpoint_ms. A checkpoint written by a future, incompatible
schema_version is rejected on load (ensure_loadable). That failure is
fail-safe, not fatal: an unreadable checkpoint logs a warning and the workflow
re-runs from scratch rather than resuming from state it can't interpret.
See Persistence for the store and Multi-Machine for the migration path.
// First attempt — may be interrupted partway through.
let outcomes = await session.parallelResumable(specs, 'release-batch-42');
// After a crash/restart: same workflowId resumes, skipping completed steps.
outcomes = await session.parallelResumable(specs, 'release-batch-42');# First attempt — may be interrupted partway through.
outcomes = session.parallel_resumable(specs, "release-batch-42")
# After a crash/restart: same workflow_id resumes, skipping completed steps.
outcomes = session.parallel_resumable(specs, "release-batch-42")Schema-forced step output
A spec carrying output_schema (outputSchema in Node) forces the step to
return a value conforming to that JSON Schema; the validated object lands in
StepOutcome.structured. This reuses the same structured-output coercion +
repair machinery as the rest of A3S Code. A coercion failure demotes the step
to unsuccessful (success: false), so callers never treat unvalidated text as
the promised object.
Forced schema applies to parallel / parallelResumable specs only — not
pipeline stages.
const [outcome] = await session.parallel([
{
taskId: 'triage',
agent: 'review',
description: 'Structured triage',
prompt: 'Triage this diff.',
outputSchema: {
type: 'object',
properties: {
severity: { type: 'string', enum: ['low', 'medium', 'high'] },
summary: { type: 'string' },
},
required: ['severity', 'summary'],
},
},
]);
if (outcome.success) {
console.log(outcome.structured.severity, outcome.structured.summary);
}outcomes = session.parallel([
{
"task_id": "triage",
"agent": "review",
"description": "Structured triage",
"prompt": "Triage this diff.",
"output_schema": {
"type": "object",
"properties": {
"severity": {"type": "string", "enum": ["low", "medium", "high"]},
"summary": {"type": "string"},
},
"required": ["severity", "summary"],
},
},
])
outcome = outcomes[0]
if outcome["success"]:
print(outcome["structured"]["severity"], outcome["structured"]["summary"])Cost governance & lifecycle
Orchestrated steps run through the same session, so the session's controls apply
to them directly. setBudgetGuard (Node) / budget_guard (Python) bounds the
LLM cost of every step; close() cancels in-flight steps along with the rest of
the session's work; and the host-provided identity labels (tenant_id,
principal, agent_template_id, correlation_id) flow through each step for
host-side aggregation and billing. See Sessions and
Limits for the details of those controls.