A3S Docs
A3S Code

Orchestration

Programmable, deterministic multi-agent orchestration: parallel fan-out, barrier-free pipelines, and resumable workflows

Orchestration

Orchestration is the programmable sibling of model-driven delegation. With Tasks and Teams the LLM decides, at run time, to call task / parallel_task — the shape of the fan-out is whatever the model chose. Orchestration moves that decision into your code: you express fan-out, pipelines, and verification panels as a grammar, so the shape is reproducible, testable, budget-bounded, and resumable — independent of what the model picks.

Reach for orchestration when the structure of the work is known to the host ahead of time (run these three reviewers in parallel; flow each candidate through explore → verify → review; resume this batch after a crash). Reach for Tasks/Teams when you want the model to decide whether and how to delegate.

The framework / host boundary (the seam)

Everything in this layer is written against a single seam, AgentExecutor: "run this step, give me the result." That seam splits responsibilities cleanly:

  • The framework owns the grammar — which steps exist, how they compose, the concurrency hint, and the serializable contracts AgentStepSpec / StepOutcome.
  • The host owns placement — transport, scheduling, and where a step actually runs.

The in-box default executor (TaskExecutor) runs every step locally, in-process, on tokio. A host substitutes its own AgentExecutor to place steps across a cluster; the combinators never observe where a step ran, so the same orchestration scales from one process to a cluster without change.

concurrency_hint() is advisory, not a hard local bound — it is the lever that lets orchestration scale past a single process (a scheduler-backed host returns its cluster-wide target instead of a local cap).

The SDK wires this up for you: AgentSession::agent_executor() returns the session-backed executor (it runs each step as a child agent on this node, inheriting the session's agent registry, LLM client, workspace, and MCP tools), and session_store() returns the session's store. The parallel / pipeline / parallelResumable methods call these for you.

Step contracts

A step is described by an AgentStepSpec and resolves to a StepOutcome. Both are serializable on purpose: a host may ship a spec to another node, and the resumable combinator persists outcomes into checkpoints.

AgentStepSpec fields:

  • task_id — stable id for the step (you assign it); flows into lifecycle events and checkpoints.
  • agent — registry key of the agent to run (e.g. explore, review).
  • description — short human label for display/tracking.
  • prompt — the instruction handed to the child agent.
  • max_steps (optional) — per-step tool-round cap.
  • parent_session_id (optional) — parent session id for event correlation.
  • output_schema (optional) — when set, the step must return a value conforming to this JSON Schema (see Schema-forced step output).

StepOutcome fields:

  • task_id — the originating step's id.
  • session_id — the child run's session id (failed steps remain addressable).
  • agent — the agent that ran.
  • output — the step's text output.
  • successfalse for a failed or panicked step (never a dropped sibling).
  • structured (optional) — schema-validated object, present only when the spec carried an output_schema.

The key casing differs by SDK:

ConceptNode (camelCase)Python (snake_case)
step idtaskIdtask_id
tool-round capmaxStepsmax_steps
parent sessionparentSessionIdparent_session_id
forced schemaoutputSchemaoutput_schema

agent, description, prompt, output, success, and structured are spelled the same in both SDKs.

parallel — barrier fan-out

session.parallel(specs) runs every spec as a fan-out and resolves with one StepOutcome per spec, in input order. It maps to the core execute_steps_parallel combinator. It is a barrier: it awaits every step before returning.

Each branch is isolated — a step that fails or panics becomes success: false; it never drops a sibling. Concurrency is bounded by the executor's concurrency hint (the session's configured parallelism by default).

const outcomes = await session.parallel([
  { taskId: 'explore', agent: 'explore', description: 'Risky changes',
    prompt: 'Find risky changed files in this diff.' },
  { taskId: 'verify', agent: 'verification', description: 'Test gaps',
    prompt: 'Identify missing or weak verification.' },
  { taskId: 'review', agent: 'review', description: 'Correctness',
    prompt: 'Review the diff for correctness risks.' },
]);

for (const outcome of outcomes) {
  if (outcome.success) {
    console.log(outcome.taskId, outcome.output);
  } else {
    console.warn('failed:', outcome.taskId, outcome.output);
  }
}
outcomes = session.parallel([
    {"task_id": "explore", "agent": "explore", "description": "Risky changes",
     "prompt": "Find risky changed files in this diff."},
    {"task_id": "verify", "agent": "verification", "description": "Test gaps",
     "prompt": "Identify missing or weak verification."},
    {"task_id": "review", "agent": "review", "description": "Correctness",
     "prompt": "Review the diff for correctness risks."},
])

for outcome in outcomes:
    if outcome["success"]:
        print(outcome["task_id"], outcome["output"])
    else:
        print("failed:", outcome["task_id"], outcome["output"])

pipeline — no barrier between stages

session.pipeline(items, stages) flows each item through a chain of stages independently — there is no barrier between stages, so item A can be in stage 3 while item B is still in stage 1. Wall-clock time is the slowest single chain, not the sum-of-slowest-per-stage that a per-stage barrier would incur.

Stages are spec-builders, not specs: each stage receives the prior outcome and the original item and returns the next step to run, or null / None to stop that item's chain early. A failed step also stops the chain (a later stage would only build on a failed result). The callback shapes:

  • Node: (ctx) => spec | null where ctx = { previous: StepOutcome | null, item }
  • Python: stage(ctx) -> spec | None where ctx = {"previous": <dict|None>, "item": <item>}

A stage can branch on the prior outcome — e.g. "verify the finding the review stage produced".

Constraints (from the source):

  • Per-stage output_schema is not supported on pipeline stages — use parallel for schema-validated steps.
  • Node: a stage callback must not throw — a throw aborts the process (the same constraint as setBudgetGuard). Wrap your logic in try/catch and return null on error.
  • Node: a stage that hangs past timeoutMs (the 3rd argument, default 30000) fails closed — it is treated as null, stopping only that chain.
  • Python: a stage callable that raises is caught and treated as None (stops only that chain).

A Node pipeline stage callback must not throw. In this napi version a JS throw at return-conversion aborts the process (the same fail-closed constraint as setBudgetGuard). Always wrap stage logic in try/catch and return null on error.

const outcomes = await session.pipeline(
  ['src/auth.ts', 'src/payments.ts'],
  [
    (ctx) => ({
      taskId: `explore-${ctx.item}`,
      agent: 'explore',
      description: 'Inspect file',
      prompt: `Summarize the responsibilities and risks of ${ctx.item}.`,
    }),
    (ctx) => {
      try {
        if (!ctx.previous) return null;
        return {
          taskId: `review-${ctx.item}`,
          agent: 'review',
          description: 'Review of prior finding',
          prompt: `Review this summary for correctness risks:\n${ctx.previous.output}`,
        };
      } catch {
        return null; // stages must not throw
      }
    },
  ],
);
def explore_stage(ctx):
    item = ctx["item"]
    return {
        "task_id": f"explore-{item}",
        "agent": "explore",
        "description": "Inspect file",
        "prompt": f"Summarize the responsibilities and risks of {item}.",
    }

def review_stage(ctx):
    prev = ctx["previous"]
    if prev is None:
        return None
    item = ctx["item"]
    return {
        "task_id": f"review-{item}",
        "agent": "review",
        "description": "Review of prior finding",
        "prompt": f"Review this summary for correctness risks:\n{prev['output']}",
    }

outcomes = session.pipeline(
    ["src/auth.ts", "src/payments.ts"],
    [explore_stage, review_stage],
)

Resumable / migratable workflows

session.parallelResumable(specs, workflowId) (Node) / session.parallel_resumable(specs, workflow_id) (Python) is parallel plus a journal. It maps to execute_steps_parallel_resumable.

At each step boundary it writes a WorkflowCheckpoint to the session store. On resume it skips already-completed steps (reusing their cached outcomes) and re-dispatches only the rest. It records only successful steps — a failed step is not journaled, so it retries on resume. On full success the checkpoint is deleted; only a crash leaves one behind for resume.

Because the checkpoint is serializable and the executor is a parameter, a host can resume an interrupted workflow on a different node (migration) by passing that node's executor.

This combinator requires a configured session store — both SDK methods reject/raise without one (the Node error message is parallelResumable requires a sessionStore).

The WorkflowCheckpoint schema is schema_version / workflow_id / steps / checkpoint_ms. A checkpoint written by a future, incompatible schema_version is rejected on load (ensure_loadable). That failure is fail-safe, not fatal: an unreadable checkpoint logs a warning and the workflow re-runs from scratch rather than resuming from state it can't interpret.

See Persistence for the store and Multi-Machine for the migration path.

// First attempt — may be interrupted partway through.
let outcomes = await session.parallelResumable(specs, 'release-batch-42');

// After a crash/restart: same workflowId resumes, skipping completed steps.
outcomes = await session.parallelResumable(specs, 'release-batch-42');
# First attempt — may be interrupted partway through.
outcomes = session.parallel_resumable(specs, "release-batch-42")

# After a crash/restart: same workflow_id resumes, skipping completed steps.
outcomes = session.parallel_resumable(specs, "release-batch-42")

Schema-forced step output

A spec carrying output_schema (outputSchema in Node) forces the step to return a value conforming to that JSON Schema; the validated object lands in StepOutcome.structured. This reuses the same structured-output coercion + repair machinery as the rest of A3S Code. A coercion failure demotes the step to unsuccessful (success: false), so callers never treat unvalidated text as the promised object.

Forced schema applies to parallel / parallelResumable specs only — not pipeline stages.

const [outcome] = await session.parallel([
  {
    taskId: 'triage',
    agent: 'review',
    description: 'Structured triage',
    prompt: 'Triage this diff.',
    outputSchema: {
      type: 'object',
      properties: {
        severity: { type: 'string', enum: ['low', 'medium', 'high'] },
        summary: { type: 'string' },
      },
      required: ['severity', 'summary'],
    },
  },
]);

if (outcome.success) {
  console.log(outcome.structured.severity, outcome.structured.summary);
}
outcomes = session.parallel([
    {
        "task_id": "triage",
        "agent": "review",
        "description": "Structured triage",
        "prompt": "Triage this diff.",
        "output_schema": {
            "type": "object",
            "properties": {
                "severity": {"type": "string", "enum": ["low", "medium", "high"]},
                "summary": {"type": "string"},
            },
            "required": ["severity", "summary"],
        },
    },
])

outcome = outcomes[0]
if outcome["success"]:
    print(outcome["structured"]["severity"], outcome["structured"]["summary"])

Cost governance & lifecycle

Orchestrated steps run through the same session, so the session's controls apply to them directly. setBudgetGuard (Node) / budget_guard (Python) bounds the LLM cost of every step; close() cancels in-flight steps along with the rest of the session's work; and the host-provided identity labels (tenant_id, principal, agent_template_id, correlation_id) flow through each step for host-side aggregation and billing. See Sessions and Limits for the details of those controls.

On this page