A3S Docs
A3S CodeExamples

Orchestration

Fan out independent work with session.parallel, build per-item chains with session.pipeline, and resume journaled runs with session.parallelResumable.

Orchestration

This page shows the three programmable orchestration primitives in A3S Code v3.4.0: session.parallel for fan-out, session.pipeline for per-item multi-stage chains, and session.parallelResumable for journaled runs that survive a crash. Use orchestration when you have several independent subagent tasks (parallel), or one transformation that flows through ordered stages per input (pipeline).

For the conceptual model behind these primitives, see Orchestration.

Fan-out with session.parallel

parallel takes an array of AgentStepSpec and runs them concurrently, returning one StepOutcome per spec in input order (not completion order). Each spec routes to a named subagent (explore, plan, review, verification, general, ...). Set outputSchema / output_schema on a spec to get a schema-validated structured result back.

import { Agent } from '@a3s-lab/code';

const agent = await Agent.create('agent.acl');
const session = agent.session('.', {});

// Independent steps; outcomes come back in input order, not completion order.
const outcomes = await session.parallel([
  { taskId: 'langs', agent: 'general', description: 'list', prompt: 'Name three systems languages.', maxSteps: 2 },
  { taskId: 'safe', agent: 'general', description: 'classify', prompt: 'Is Rust memory-safe without a GC? yes/no.', maxSteps: 2 },
]);

for (const o of outcomes) {
  console.log(`[parallel] ${o.taskId}: success=${o.success}`);
}

await session.close();
from a3s_code import Agent, SessionOptions

agent = Agent.create(open("agent.acl").read())
session = agent.session(".", SessionOptions())

# Independent steps; outcomes come back in input order, not completion order.
outcomes = session.parallel([
    {
        "task_id": "langs",
        "agent": "general",
        "description": "list languages",
        "prompt": "Name three systems programming languages, comma-separated.",
        "max_steps": 2,
    },
    {
        "task_id": "verdict",
        "agent": "general",
        "description": "classify",
        "prompt": "Is Rust memory-safe without a GC? Answer yes or no.",
        "max_steps": 2,
        # Schema-validated structured output for this step.
        "output_schema": {
            "type": "object",
            "properties": {"memory_safe": {"type": "boolean"}},
            "required": ["memory_safe"],
        },
    },
])

for o in outcomes:
    print(f"[parallel] {o['task_id']}: success={o['success']} structured={o.get('structured')}")

session.close()

Outcomes are dicts in Python (o['task_id'], o['success'], o.get('structured')) and objects in Node (o.taskId, o.success, o.structured). The maxParallelTasks / max_parallel_tasks session option caps concurrency; extra specs queue, and the outcome array is still returned in full, in order.

Per-item chains with session.pipeline

pipeline takes a list of input items and an ordered list of stages. Each item flows through the stages independently — there is no barrier between stages, so a fast item can reach stage 2 while a slow item is still in stage 1. A stage callback receives a ctx: the first stage sees ctx.item, later stages see ctx.previous (the prior StepOutcome, whose .output you build on). Return the next spec to continue, or null / None to stop that item's chain early.

import { Agent } from '@a3s-lab/code';

const agent = await Agent.create('agent.acl');
const session = agent.session('.', {});

// Stage 2 builds on stage 1's output. A stage callback MUST NOT throw —
// return null to stop this item's chain.
const results = await session.pipeline(
  ['the Rust programming language'],
  [
    (ctx) => ({
      taskId: 'sum',
      agent: 'general',
      description: 'summarize',
      prompt: `In one sentence, what is ${ctx.item}?`,
      maxSteps: 2,
    }),
    (ctx) => ({
      taskId: 'cls',
      agent: 'general',
      description: 'classify',
      prompt: `Reply YES or NO: does this describe a programming language?\n\n${ctx.previous.output}`,
      maxSteps: 2,
    }),
  ],
);

for (const r of results) {
  console.log(`[pipeline] final=${r === null ? null : JSON.stringify(r.output.slice(0, 60))}`);
}

await session.close();
from a3s_code import Agent, SessionOptions

agent = Agent.create(open("agent.acl").read())
session = agent.session(".", SessionOptions())

# Each item chains through stages; stage 2 builds on stage 1. Return None from a
# stage (or raise — caught and treated as None) to stop that item's chain.
results = session.pipeline(
    ["the Rust programming language"],
    [
        lambda ctx: {
            "task_id": "summarize",
            "agent": "general",
            "description": "summarize",
            "prompt": f"In one sentence, what is {ctx['item']}?",
            "max_steps": 2,
        },
        lambda ctx: {
            "task_id": "classify",
            "agent": "general",
            "description": "classify",
            "prompt": "Reply with one word YES or NO: does this describe a "
            f"programming language?\n\n{ctx['previous']['output']}",
            "max_steps": 2,
        },
    ],
)

for r in results:
    print(f"[pipeline] final={None if r is None else r['output'][:60]!r}")

session.close()

Key difference from parallel: stages are ordered and dependent, but items do not wait for each other between stages. Node stage callbacks must never throw — return null on error; Python stages may raise (a raised stage is caught and treated as None).

Resumable runs with session.parallelResumable

parallelResumable is parallel with a journal. It takes the specs first and a stable workflowId second; each step's outcome is journaled to the session's store, so if the process crashes mid-run you can call it again with the same workflowId and completed steps are replayed from the journal instead of re-executed. It requires a session store — pass sessionStore / session_store when opening the session, or the call throws.

import { Agent, FileSessionStore } from '@a3s-lab/code';

const agent = await Agent.create('agent.acl');
// parallelResumable journals to the session store; it throws without one.
const session = agent.session('.', {
  sessionStore: new FileSessionStore('./.a3s/sessions'),
});

// Signature is (specs, workflowId): specs first, stable workflowId second.
const outcomes = await session.parallelResumable(
  [
    { taskId: 'deps', agent: 'general', description: 'audit deps', prompt: 'Check manifests for outdated dependencies.', maxSteps: 2 },
    { taskId: 'tests', agent: 'verification', description: 'run tests', prompt: 'Run the test suite and summarize failures.', maxSteps: 2 },
  ],
  'nightly-audit',
);

// Re-running with the same workflowId replays completed steps from the journal.
console.log(outcomes.map((o) => `${o.taskId}:${o.success}`).join(' '));

await session.close();
from a3s_code import Agent, SessionOptions, FileSessionStore

agent = Agent.create(open("agent.acl").read())
# parallel_resumable journals to the session store; it raises without one.
opts = SessionOptions()
opts.session_store = FileSessionStore("./.a3s/sessions")
session = agent.session(".", opts)

# Signature is (specs, workflow_id): specs first, stable workflow_id second.
outcomes = session.parallel_resumable(
    [
        {"task_id": "deps", "agent": "general", "description": "audit deps", "prompt": "Check manifests for outdated dependencies.", "max_steps": 2},
        {"task_id": "tests", "agent": "verification", "description": "run tests", "prompt": "Run the test suite and summarize failures.", "max_steps": 2},
    ],
    "nightly-audit",
)

# Re-running with the same workflow_id replays completed steps from the journal.
print(" ".join(f"{o['task_id']}:{o['success']}" for o in outcomes))

session.close()

Notes:

  • All three primitives return outcomes aligned to input order: { taskId, success, output, error?, structured? } (Node objects) / { "task_id", "success", "output", "error"?, "structured"? } (Python dicts).
  • Set outputSchema / output_schema on a spec to get a parsed result back in structured.
  • maxSteps / max_steps caps the steps per subagent; maxParallelTasks / max_parallel_tasks (session option) caps fan-out concurrency.
  • Node pipeline stage callbacks must never throw — return null on error. Python stages may raise (a raised stage is caught and treated as None).

A runnable version ships at crates/code/sdk/node/examples/orchestration/parallel-pipeline.mjs and crates/code/sdk/python/examples/orchestration_workflow.py.

On this page