A3S Docs
A3S Code

Lane Queue

Priority-based tool execution with parallel reads and multi-machine external processing

Lane Queue

The Lane Queue system routes tool execution through a priority-based queue backed by a3s-lane v0.4.0. When enabled, read-only tools execute in parallel while write tools maintain sequential ordering.

Fully integrated with A3S Lane v0.4.0 — includes retry policies, rate limiting, priority boost, pressure monitoring, and per-lane timeouts.

How It Works

Each tool call is mapped to a lane based on its type:

Prop

Type

Higher-priority lanes are scheduled before lower-priority lanes — a Query (P1) task submitted after Execute (P2) tasks will be scheduled ahead of queued Execute tasks. See Lane-Based Priority Preemption Examples for multi-language SDK demos.

When the LLM returns multiple tool calls in a single turn:

  1. Query-lane tools are submitted to the queue and executed in parallel (up to query_max_concurrency, default 4)
  2. All other tools execute sequentially in order, preserving side-effect ordering
  3. All pre-execution checks (permissions, HITL, hooks, skill filters) run before queue submission

Enabling the Queue

Rust

Python

from a3s_code import Agent, SessionQueueConfig

qc = SessionQueueConfig()
qc.with_lane_features()
session = agent.session("/project", queue_config=qc)

TypeScript

const session = agent.session('/project', {
  queueConfig: { enableAllFeatures: true },
});

Queue Features

Dead Letter Queue (DLQ)

Commands that fail after exhausting retry attempts (3 by default, exponential backoff) are moved to the DLQ. Query dead letters via session.dead_letters().

Metrics

When enabled, the queue collects per-lane counters, gauges, and histograms (latency percentiles). Access via session.queue_metrics().

Alerts

Queue alert events are available in the Rust SDK. In TypeScript, monitor queue depth via session.queueStats().

Advanced Features (A3S Lane v0.4.0)

1. Retry Policy

Automatically retry failed tasks with configurable strategies.

Configuration:

queue {
  retry_policy {
    strategy = "exponential"  # exponential, fixed, or none
    max_retries = 3
    initial_delay_ms = 100
  }
}

Strategies:

  • exponential — Exponential backoff (100ms → 200ms → 400ms)
  • fixed — Fixed delay between retries (requires fixed_delay_ms)
  • none — No automatic retries

Example (Fixed Delay):

queue {
  retry_policy {
    strategy = "fixed"
    max_retries = 5
    fixed_delay_ms = 1000  # 1 second between retries
  }
}

2. Rate Limiting

Control the maximum number of operations per time window.

Configuration:

queue {
  rate_limit {
    limit_type = "per_second"  # per_second, per_minute, per_hour, unlimited
    max_operations = 100
  }
}

Limit Types:

  • per_second — Maximum operations per second
  • per_minute — Maximum operations per minute
  • per_hour — Maximum operations per hour
  • unlimited — No rate limiting

Rate limiting requires the distributed feature in a3s-lane.

3. Priority Boost

Automatically boost task priority as deadlines approach.

Configuration:

queue {
  priority_boost {
    strategy = "standard"  # standard, aggressive, or disabled
    deadline_ms = 300000   # 5 minutes
  }
}

Strategies:

  • standard — Boost at 75%, 50%, 25% remaining time
  • aggressive — Boost earlier and more frequently
  • disabled — No priority boost

Priority boost requires the distributed feature in a3s-lane.

How It Works:

Task submitted with deadline_ms = 300000 (5 minutes)

Time remaining: 225s (75%) → Priority +1
Time remaining: 150s (50%) → Priority +2
Time remaining: 75s (25%)  → Priority +3

4. Pressure Monitoring

Emit events when queue pressure exceeds threshold.

Configuration:

queue {
  pressure_threshold = 50  # Emit event when pending tasks >= 50
}

Events:

  • queue.lane.pressure — Emitted when pending tasks >= threshold
  • queue.lane.idle — Emitted when queue becomes empty

Use Cases:

  • Auto-scaling triggers
  • Alert notifications
  • Performance monitoring

5. Per-Lane Timeouts

Override default timeout for specific lanes.

Configuration:

queue {
  default_timeout_ms = 60000  # Global default: 60 seconds

  lane_timeouts {
    query = 30000      # Query lane: 30 seconds
    execute = 120000   # Execute lane: 2 minutes
  }
}

Priority: Lane-specific timeout > Default timeout

Complete Advanced Configuration

queue {
  # Concurrency limits
  control_max_concurrency = 2
  query_max_concurrency = 10
  execute_max_concurrency = 5
  generate_max_concurrency = 1

  # Basic features
  enable_metrics = true
  enable_dlq = true
  enable_alerts = true
  storage_path = "./queue_data"
  default_timeout_ms = 60000

  # Advanced: Retry policy
  retry_policy {
    strategy = "exponential"
    max_retries = 3
    initial_delay_ms = 100
  }

  # Advanced: Rate limiting
  rate_limit {
    limit_type = "per_second"
    max_operations = 100
  }

  # Advanced: Priority boost
  priority_boost {
    strategy = "standard"
    deadline_ms = 300000
  }

  # Advanced: Pressure monitoring
  pressure_threshold = 50

  # Advanced: Per-lane timeouts
  lane_timeouts {
    query = 30000
    execute = 120000
  }
}

Queue Status API

The SDK exposes real-time queue introspection on every session with a queue configured.

SessionQueueStats

const stats = await session.queueStats();
console.log(`Pending: ${stats.totalPending}, Active: ${stats.totalActive}`);

// External tasks waiting for completion
const external = await session.pendingExternalTasks();
console.log(`External pending: ${external.length}`);
for (const task of external) {
  console.log(`  [${task.task_id}] ${task.command_type} in ${task.lane}${task.remaining_ms}ms left`);
}
stats = session.queue_stats()
print(f"Pending: {stats['total_pending']}, Active: {stats['total_active']}")

# External tasks waiting for completion
external = session.pending_external_tasks()
print(f"External pending: {len(external)}")
for task in external:
    print(f"  [{task['task_id']}] {task['command_type']} in {task['lane']}{task['remaining_ms']}ms left")

Metrics and Dead Letters

External Task Handling

Any lane can be switched to External mode, where tool calls in that lane are not executed locally — instead they become ExternalTask objects that your SDK code must poll, process (locally or on a remote machine), and complete via callback. This is the mechanism for multi-machine parallel execution.

Handler Modes

Prop

Type

ExternalTask

When a tool call enters a lane in External mode, the queue creates an ExternalTask:

Prop

Type

Methods: is_timed_out() checks if the task has expired, remaining_ms() returns remaining time.

ExternalTaskResult

Complete a task by providing:

Prop

Type

Lifecycle

Agent loop: LLM returns tool call (e.g., bash("cargo test"))

    ├─ Lane routing: "bash" → Execute lane

    ├─ Execute lane is External mode
    │   │
    │   ├─ 1. Create ExternalTask with task_id, payload, timeout
    │   ├─ 2. Store in pending external tasks map
    │   ├─ 3. Emit ExternalTaskPending event
    │   ├─ 4. Block — waiting for completion or timeout
    │   │
    │   │    ┌─── SDK / Remote Worker ───┐
    │   │    │ Poll pending_external_tasks()      │
    │   │    │ Extract payload, execute remotely   │
    │   │    │ Call complete_external_task(result)  │
    │   │    └────────────────────────────────────┘
    │   │
    │   ├─ 5. Receive result via oneshot channel
    │   ├─ 6. Emit ExternalTaskCompleted event
    │   └─ 7. Return result to agent loop

    └─ Agent continues with tool result

If the SDK doesn't call complete_external_task() within timeout_ms, the task fails with a timeout error and the agent loop handles it as a tool failure.

Multi-Machine Parallel Processing

The core pattern: run the agent session on a coordinator machine, route specific lanes to External mode, and have worker processes on remote machines poll and execute those tasks.

Architecture

┌─────────────────────────────────────────────────────┐
│ Coordinator (runs agent session)                     │
│                                                      │
│  Agent Loop ──→ Lane Queue                           │
│                   ├─ Query lane  [Internal]           │
│                   │   └─ read, glob, grep → local     │
│                   ├─ Execute lane [External]           │
│                   │   └─ bash, write → ExternalTask   │
│                   └─ Generate lane [Internal]          │
│                       └─ LLM calls → local            │
│                                                      │
│  pending_external_tasks() ──→ task queue ──→ workers  │
│  complete_external_task()  ←── results ←── workers    │
└──────────────────────────┬──────────────────────────┘
                           │ network (gRPC, HTTP, Redis, etc.)
          ┌────────────────┼────────────────┐
          ▼                ▼                ▼
   ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
   │  Worker A   │ │  Worker B   │ │  Worker C   │
   │  (machine)  │ │  (machine)  │ │  (machine)  │
   │  bash exec  │ │  bash exec  │ │  bash exec  │
   └─────────────┘ └─────────────┘ └─────────────┘

Coordinator: Setup and Event Loop

import { Agent } from '@a3s-lab/code';

const agent = await Agent.create('agent.hcl');

// 1. Create session with queue
const session = agent.session('/project', {
  queueConfig: { enableAllFeatures: true, defaultTimeoutMs: 60000 },
});

// 2. Route Execute lane to external handlers
await session.setLaneHandler('execute', {
  mode: 'external',
  timeoutMs: 60000,
});

// 3. Stream and handle external tasks
const events = await session.stream(
  'Run the full test suite, build for release, and deploy to staging'
);
for (const event of events) {
  if (event.type === 'external_task_pending') {
    const tasks = await session.pendingExternalTasks();
    await Promise.all(tasks.map(async (task) => {
      // dispatchToWorker is your transport layer
      const workerResult = await dispatchToWorker(task);

      await session.completeExternalTask(task.task_id, {
        success: workerResult.success,
        result: { output: workerResult.output, exit_code: workerResult.exitCode },
        error: workerResult.error,
      });
    }));
  }
  if (event.type === 'text_delta') process.stdout.write(event.text);
  if (event.type === 'end') break;
}
import asyncio
from a3s_code import Agent, SessionQueueConfig

agent = Agent.create("agent.hcl")

# 1. Create session with queue
qc = SessionQueueConfig()
qc.with_lane_features()
qc.set_default_timeout(60000)
session = agent.session("/project", queue_config=qc)

# 2. Route Execute lane to external handlers
session.set_lane_handler("execute", mode="external", timeout_ms=60000)

# 3. Stream and handle external tasks
for event in session.stream("Run the full test suite, build for release, and deploy to staging"):
    if event.event_type == "external_task_pending":
        tasks = session.pending_external_tasks()

        async def handle(task):
            # dispatch_to_worker is your transport layer
            worker_result = await dispatch_to_worker(task)
            session.complete_external_task(task["task_id"], {
                "success": worker_result.success,
                "result": {"output": worker_result.output, "exit_code": worker_result.exit_code},
                "error": worker_result.error,
            })

        asyncio.run(asyncio.gather(*[handle(t) for t in tasks]))
    elif event.event_type == "text_delta":
        print(event.text, end="", flush=True)
    elif event.event_type == "end":
        break

Worker Process (Remote Machine)

The worker is a standalone process that receives tasks via your transport layer, executes them, and returns results:

The transport layer between coordinator and workers is your choice — A3S Code provides the ExternalTask/ExternalTaskResult contract, you provide the plumbing (gRPC, HTTP, Redis, NATS, SQS, etc.).

Hybrid Mode: Local Execution + External Notification

Use Hybrid mode when you want local execution as the default path but also need external visibility — for monitoring, audit logging, or shadow processing on remote machines:

// Hybrid: execute locally, also notify external watchers
await session.setLaneHandler('execute', { mode: 'hybrid', timeoutMs: 60000 });

// Tools run locally. ExternalTaskPending events emitted for observation.
const result = await session.send('Run cargo test and fix any failures');
# Hybrid: execute locally, also notify external watchers
session.set_lane_handler("execute", mode="hybrid", timeout_ms=60000)

# Tools run locally. ExternalTaskPending events emitted for observation.
result = session.send("Run cargo test and fix any failures")

Dynamic Lane Switching

Lanes can be switched between modes at runtime. This enables adaptive strategies — start in Internal mode, switch to External when workers are available:

Planning + External Execution

When planning is enabled, each plan step goes through the same agent loop and lane queue. This means externally-handled lanes work transparently with execution plans:

Configuration Reference

Lane handlers can also be pre-configured in SessionQueueConfig instead of calling set_lane_handler() at runtime:

Backward Compatibility

When no queue_config is set in SessionOptions, the queue is not created and all tools execute sequentially via direct ToolExecutor calls, exactly as before. The queue is purely opt-in.

Direct Queue Submission

Use submit() and submit_batch() to inject tasks directly into the lane queue from your SDK code — bypassing the LLM turn. This is useful for proactively scheduling work, fan-out patterns, or driving the queue from external orchestrators.

// Submit a single task to the Execute lane
const result = await session.submit(
  'execute',
  { command: 'cargo test --workspace' },
);
console.log(result);

// Submit multiple tasks to the Query lane in one call (executed in parallel)
const results = await session.submitBatch(
  'query',
  [
    { path: 'src/main.rs' },
    { path: 'src/lib.rs' },
    { path: 'Cargo.toml' },
  ],
);
results.forEach(r => console.log(r));
# Submit a single task to the Execute lane
result = session.submit(
    "execute",
    {"command": "cargo test --workspace"},
)
print(result)

# Submit multiple tasks to the Query lane in one call (executed in parallel)
results = session.submit_batch(
    "query",
    [
        {"path": "src/main.rs"},
        {"path": "src/lib.rs"},
        {"path": "Cargo.toml"},
    ],
)
for r in results:
    print(r)

Lane names

LaneString valuePriority
Control"control"P0 (highest)
Query"query"P1
Execute"execute"P2
Generate"generate"P3 (lowest)

submit_batch() submits all payloads to the same lane. Query-lane batches execute in parallel (up to query_max_concurrency). All other lanes execute sequentially.

API Reference

SessionQueueConfig fields

Prop

Type

LaneHandlerConfig fields

Prop

Type

RetryPolicy fields

Prop

Type

On this page