Lane Queue
Priority-based tool execution with parallel reads and multi-machine external processing
Lane Queue
The Lane Queue system routes tool execution through a priority-based queue backed by a3s-lane v0.4.0. When enabled, read-only tools execute in parallel while write tools maintain sequential ordering.
Fully integrated with A3S Lane v0.4.0 — includes retry policies, rate limiting, priority boost, pressure monitoring, and per-lane timeouts.
How It Works
Each tool call is mapped to a lane based on its type:
Prop
Type
Higher-priority lanes are scheduled before lower-priority lanes — a Query (P1) task submitted after Execute (P2) tasks will be scheduled ahead of queued Execute tasks. See Lane-Based Priority Preemption Examples for multi-language SDK demos.
When the LLM returns multiple tool calls in a single turn:
- Query-lane tools are submitted to the queue and executed in parallel (up to
query_max_concurrency, default 4) - All other tools execute sequentially in order, preserving side-effect ordering
- All pre-execution checks (permissions, HITL, hooks, skill filters) run before queue submission
Enabling the Queue
Rust
Python
from a3s_code import Agent, SessionQueueConfig
qc = SessionQueueConfig()
qc.with_lane_features()
session = agent.session("/project", queue_config=qc)TypeScript
const session = agent.session('/project', {
queueConfig: { enableAllFeatures: true },
});Queue Features
Dead Letter Queue (DLQ)
Commands that fail after exhausting retry attempts (3 by default, exponential backoff) are moved to the DLQ. Query dead letters via session.dead_letters().
Metrics
When enabled, the queue collects per-lane counters, gauges, and histograms (latency percentiles). Access via session.queue_metrics().
Alerts
Queue alert events are available in the Rust SDK. In TypeScript, monitor queue depth via session.queueStats().
Advanced Features (A3S Lane v0.4.0)
1. Retry Policy
Automatically retry failed tasks with configurable strategies.
Configuration:
queue {
retry_policy {
strategy = "exponential" # exponential, fixed, or none
max_retries = 3
initial_delay_ms = 100
}
}Strategies:
exponential— Exponential backoff (100ms → 200ms → 400ms)fixed— Fixed delay between retries (requiresfixed_delay_ms)none— No automatic retries
Example (Fixed Delay):
queue {
retry_policy {
strategy = "fixed"
max_retries = 5
fixed_delay_ms = 1000 # 1 second between retries
}
}2. Rate Limiting
Control the maximum number of operations per time window.
Configuration:
queue {
rate_limit {
limit_type = "per_second" # per_second, per_minute, per_hour, unlimited
max_operations = 100
}
}Limit Types:
per_second— Maximum operations per secondper_minute— Maximum operations per minuteper_hour— Maximum operations per hourunlimited— No rate limiting
Rate limiting requires the distributed feature in a3s-lane.
3. Priority Boost
Automatically boost task priority as deadlines approach.
Configuration:
queue {
priority_boost {
strategy = "standard" # standard, aggressive, or disabled
deadline_ms = 300000 # 5 minutes
}
}Strategies:
standard— Boost at 75%, 50%, 25% remaining timeaggressive— Boost earlier and more frequentlydisabled— No priority boost
Priority boost requires the distributed feature in a3s-lane.
How It Works:
Task submitted with deadline_ms = 300000 (5 minutes)
Time remaining: 225s (75%) → Priority +1
Time remaining: 150s (50%) → Priority +2
Time remaining: 75s (25%) → Priority +34. Pressure Monitoring
Emit events when queue pressure exceeds threshold.
Configuration:
queue {
pressure_threshold = 50 # Emit event when pending tasks >= 50
}Events:
queue.lane.pressure— Emitted when pending tasks >= thresholdqueue.lane.idle— Emitted when queue becomes empty
Use Cases:
- Auto-scaling triggers
- Alert notifications
- Performance monitoring
5. Per-Lane Timeouts
Override default timeout for specific lanes.
Configuration:
queue {
default_timeout_ms = 60000 # Global default: 60 seconds
lane_timeouts {
query = 30000 # Query lane: 30 seconds
execute = 120000 # Execute lane: 2 minutes
}
}Priority: Lane-specific timeout > Default timeout
Complete Advanced Configuration
queue {
# Concurrency limits
control_max_concurrency = 2
query_max_concurrency = 10
execute_max_concurrency = 5
generate_max_concurrency = 1
# Basic features
enable_metrics = true
enable_dlq = true
enable_alerts = true
storage_path = "./queue_data"
default_timeout_ms = 60000
# Advanced: Retry policy
retry_policy {
strategy = "exponential"
max_retries = 3
initial_delay_ms = 100
}
# Advanced: Rate limiting
rate_limit {
limit_type = "per_second"
max_operations = 100
}
# Advanced: Priority boost
priority_boost {
strategy = "standard"
deadline_ms = 300000
}
# Advanced: Pressure monitoring
pressure_threshold = 50
# Advanced: Per-lane timeouts
lane_timeouts {
query = 30000
execute = 120000
}
}Queue Status API
The SDK exposes real-time queue introspection on every session with a queue configured.
SessionQueueStats
const stats = await session.queueStats();
console.log(`Pending: ${stats.totalPending}, Active: ${stats.totalActive}`);
// External tasks waiting for completion
const external = await session.pendingExternalTasks();
console.log(`External pending: ${external.length}`);
for (const task of external) {
console.log(` [${task.task_id}] ${task.command_type} in ${task.lane} — ${task.remaining_ms}ms left`);
}stats = session.queue_stats()
print(f"Pending: {stats['total_pending']}, Active: {stats['total_active']}")
# External tasks waiting for completion
external = session.pending_external_tasks()
print(f"External pending: {len(external)}")
for task in external:
print(f" [{task['task_id']}] {task['command_type']} in {task['lane']} — {task['remaining_ms']}ms left")Metrics and Dead Letters
External Task Handling
Any lane can be switched to External mode, where tool calls in that lane are not executed locally — instead they become ExternalTask objects that your SDK code must poll, process (locally or on a remote machine), and complete via callback. This is the mechanism for multi-machine parallel execution.
Handler Modes
Prop
Type
ExternalTask
When a tool call enters a lane in External mode, the queue creates an ExternalTask:
Prop
Type
Methods: is_timed_out() checks if the task has expired, remaining_ms() returns remaining time.
ExternalTaskResult
Complete a task by providing:
Prop
Type
Lifecycle
Agent loop: LLM returns tool call (e.g., bash("cargo test"))
│
├─ Lane routing: "bash" → Execute lane
│
├─ Execute lane is External mode
│ │
│ ├─ 1. Create ExternalTask with task_id, payload, timeout
│ ├─ 2. Store in pending external tasks map
│ ├─ 3. Emit ExternalTaskPending event
│ ├─ 4. Block — waiting for completion or timeout
│ │
│ │ ┌─── SDK / Remote Worker ───┐
│ │ │ Poll pending_external_tasks() │
│ │ │ Extract payload, execute remotely │
│ │ │ Call complete_external_task(result) │
│ │ └────────────────────────────────────┘
│ │
│ ├─ 5. Receive result via oneshot channel
│ ├─ 6. Emit ExternalTaskCompleted event
│ └─ 7. Return result to agent loop
│
└─ Agent continues with tool resultIf the SDK doesn't call complete_external_task() within timeout_ms, the task fails with a timeout error and the agent loop handles it as a tool failure.
Multi-Machine Parallel Processing
The core pattern: run the agent session on a coordinator machine, route specific lanes to External mode, and have worker processes on remote machines poll and execute those tasks.
Architecture
┌─────────────────────────────────────────────────────┐
│ Coordinator (runs agent session) │
│ │
│ Agent Loop ──→ Lane Queue │
│ ├─ Query lane [Internal] │
│ │ └─ read, glob, grep → local │
│ ├─ Execute lane [External] │
│ │ └─ bash, write → ExternalTask │
│ └─ Generate lane [Internal] │
│ └─ LLM calls → local │
│ │
│ pending_external_tasks() ──→ task queue ──→ workers │
│ complete_external_task() ←── results ←── workers │
└──────────────────────────┬──────────────────────────┘
│ network (gRPC, HTTP, Redis, etc.)
┌────────────────┼────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Worker A │ │ Worker B │ │ Worker C │
│ (machine) │ │ (machine) │ │ (machine) │
│ bash exec │ │ bash exec │ │ bash exec │
└─────────────┘ └─────────────┘ └─────────────┘Coordinator: Setup and Event Loop
import { Agent } from '@a3s-lab/code';
const agent = await Agent.create('agent.hcl');
// 1. Create session with queue
const session = agent.session('/project', {
queueConfig: { enableAllFeatures: true, defaultTimeoutMs: 60000 },
});
// 2. Route Execute lane to external handlers
await session.setLaneHandler('execute', {
mode: 'external',
timeoutMs: 60000,
});
// 3. Stream and handle external tasks
const events = await session.stream(
'Run the full test suite, build for release, and deploy to staging'
);
for (const event of events) {
if (event.type === 'external_task_pending') {
const tasks = await session.pendingExternalTasks();
await Promise.all(tasks.map(async (task) => {
// dispatchToWorker is your transport layer
const workerResult = await dispatchToWorker(task);
await session.completeExternalTask(task.task_id, {
success: workerResult.success,
result: { output: workerResult.output, exit_code: workerResult.exitCode },
error: workerResult.error,
});
}));
}
if (event.type === 'text_delta') process.stdout.write(event.text);
if (event.type === 'end') break;
}import asyncio
from a3s_code import Agent, SessionQueueConfig
agent = Agent.create("agent.hcl")
# 1. Create session with queue
qc = SessionQueueConfig()
qc.with_lane_features()
qc.set_default_timeout(60000)
session = agent.session("/project", queue_config=qc)
# 2. Route Execute lane to external handlers
session.set_lane_handler("execute", mode="external", timeout_ms=60000)
# 3. Stream and handle external tasks
for event in session.stream("Run the full test suite, build for release, and deploy to staging"):
if event.event_type == "external_task_pending":
tasks = session.pending_external_tasks()
async def handle(task):
# dispatch_to_worker is your transport layer
worker_result = await dispatch_to_worker(task)
session.complete_external_task(task["task_id"], {
"success": worker_result.success,
"result": {"output": worker_result.output, "exit_code": worker_result.exit_code},
"error": worker_result.error,
})
asyncio.run(asyncio.gather(*[handle(t) for t in tasks]))
elif event.event_type == "text_delta":
print(event.text, end="", flush=True)
elif event.event_type == "end":
breakWorker Process (Remote Machine)
The worker is a standalone process that receives tasks via your transport layer, executes them, and returns results:
The transport layer between coordinator and workers is your choice — A3S Code provides the ExternalTask/ExternalTaskResult contract, you provide the plumbing (gRPC, HTTP, Redis, NATS, SQS, etc.).
Hybrid Mode: Local Execution + External Notification
Use Hybrid mode when you want local execution as the default path but also need external visibility — for monitoring, audit logging, or shadow processing on remote machines:
// Hybrid: execute locally, also notify external watchers
await session.setLaneHandler('execute', { mode: 'hybrid', timeoutMs: 60000 });
// Tools run locally. ExternalTaskPending events emitted for observation.
const result = await session.send('Run cargo test and fix any failures');# Hybrid: execute locally, also notify external watchers
session.set_lane_handler("execute", mode="hybrid", timeout_ms=60000)
# Tools run locally. ExternalTaskPending events emitted for observation.
result = session.send("Run cargo test and fix any failures")Dynamic Lane Switching
Lanes can be switched between modes at runtime. This enables adaptive strategies — start in Internal mode, switch to External when workers are available:
Planning + External Execution
When planning is enabled, each plan step goes through the same agent loop and lane queue. This means externally-handled lanes work transparently with execution plans:
Configuration Reference
Lane handlers can also be pre-configured in SessionQueueConfig instead of calling set_lane_handler() at runtime:
Backward Compatibility
When no queue_config is set in SessionOptions, the queue is not created and all tools execute sequentially via direct ToolExecutor calls, exactly as before. The queue is purely opt-in.
Direct Queue Submission
Use submit() and submit_batch() to inject tasks directly into the lane queue from your SDK code — bypassing the LLM turn. This is useful for proactively scheduling work, fan-out patterns, or driving the queue from external orchestrators.
// Submit a single task to the Execute lane
const result = await session.submit(
'execute',
{ command: 'cargo test --workspace' },
);
console.log(result);
// Submit multiple tasks to the Query lane in one call (executed in parallel)
const results = await session.submitBatch(
'query',
[
{ path: 'src/main.rs' },
{ path: 'src/lib.rs' },
{ path: 'Cargo.toml' },
],
);
results.forEach(r => console.log(r));# Submit a single task to the Execute lane
result = session.submit(
"execute",
{"command": "cargo test --workspace"},
)
print(result)
# Submit multiple tasks to the Query lane in one call (executed in parallel)
results = session.submit_batch(
"query",
[
{"path": "src/main.rs"},
{"path": "src/lib.rs"},
{"path": "Cargo.toml"},
],
)
for r in results:
print(r)Lane names
| Lane | String value | Priority |
|---|---|---|
| Control | "control" | P0 (highest) |
| Query | "query" | P1 |
| Execute | "execute" | P2 |
| Generate | "generate" | P3 (lowest) |
submit_batch() submits all payloads to the same lane. Query-lane batches execute in parallel (up to query_max_concurrency). All other lanes execute sequentially.
API Reference
SessionQueueConfig fields
Prop
Type
LaneHandlerConfig fields
Prop
Type
RetryPolicy fields
Prop
Type