Lane Queue
Priority-based tool execution with parallel reads and multi-machine external processing
Lane Queue
The Lane Queue system routes tool execution through a priority-based queue backed by a3s-lane v0.4.0. When enabled, read-only tools execute in parallel while write tools maintain sequential ordering.
Fully integrated with A3S Lane v0.4.0 — includes retry policies, rate limiting, priority boost, pressure monitoring, and per-lane timeouts.
How It Works
Each tool call is mapped to a lane based on its type:
Prop
Type
Higher-priority lanes are scheduled before lower-priority lanes — a Query (P1) task submitted after Execute (P2) tasks will be scheduled ahead of queued Execute tasks. See Lane-Based Priority Preemption Examples for multi-language SDK demos.
When the LLM returns multiple tool calls in a single turn:
- Query-lane tools are submitted to the queue and executed in parallel (up to
query_max_concurrency, default 4) - All other tools execute sequentially in order, preserving side-effect ordering
- All pre-execution checks (permissions, HITL, hooks, skill filters) run before queue submission
Enabling the Queue
Rust
use a3s_code_core::{Agent, SessionOptions, SessionQueueConfig};
let session = agent.session("/project", Some(
SessionOptions::new()
.with_queue_config(SessionQueueConfig::default().with_lane_features())
))?;Python
from a3s_code import Agent, SessionQueueConfig
qc = SessionQueueConfig()
qc.with_lane_features()
session = agent.session("/project", queue_config=qc)TypeScript
const session = agent.session('/project', {
queueConfig: { enableAllFeatures: true },
});Queue Features
Dead Letter Queue (DLQ)
Commands that fail after exhausting retry attempts (3 by default, exponential backoff) are moved to the DLQ. Query dead letters via session.dead_letters().
Metrics
When enabled, the queue collects per-lane counters, gauges, and histograms (latency percentiles). Access via session.queue_metrics().
Alerts
Queue depth alerts fire when pending commands exceed thresholds (50 warning, 100 critical). These emit AgentEvent::QueueAlert events.
Advanced Features (A3S Lane v0.4.0)
1. Retry Policy
Automatically retry failed tasks with configurable strategies.
Configuration:
queue {
retry_policy {
strategy = "exponential" # exponential, fixed, or none
max_retries = 3
initial_delay_ms = 100
}
}Strategies:
exponential— Exponential backoff (100ms → 200ms → 400ms)fixed— Fixed delay between retries (requiresfixed_delay_ms)none— No automatic retries
Example (Fixed Delay):
queue {
retry_policy {
strategy = "fixed"
max_retries = 5
fixed_delay_ms = 1000 # 1 second between retries
}
}2. Rate Limiting
Control the maximum number of operations per time window.
Configuration:
queue {
rate_limit {
limit_type = "per_second" # per_second, per_minute, per_hour, unlimited
max_operations = 100
}
}Limit Types:
per_second— Maximum operations per secondper_minute— Maximum operations per minuteper_hour— Maximum operations per hourunlimited— No rate limiting
Rate limiting requires the distributed feature in a3s-lane.
3. Priority Boost
Automatically boost task priority as deadlines approach.
Configuration:
queue {
priority_boost {
strategy = "standard" # standard, aggressive, or disabled
deadline_ms = 300000 # 5 minutes
}
}Strategies:
standard— Boost at 75%, 50%, 25% remaining timeaggressive— Boost earlier and more frequentlydisabled— No priority boost
Priority boost requires the distributed feature in a3s-lane.
How It Works:
Task submitted with deadline_ms = 300000 (5 minutes)
Time remaining: 225s (75%) → Priority +1
Time remaining: 150s (50%) → Priority +2
Time remaining: 75s (25%) → Priority +34. Pressure Monitoring
Emit events when queue pressure exceeds threshold.
Configuration:
queue {
pressure_threshold = 50 # Emit event when pending tasks >= 50
}Events:
queue.lane.pressure— Emitted when pending tasks >= thresholdqueue.lane.idle— Emitted when queue becomes empty
Use Cases:
- Auto-scaling triggers
- Alert notifications
- Performance monitoring
5. Per-Lane Timeouts
Override default timeout for specific lanes.
Configuration:
queue {
default_timeout_ms = 60000 # Global default: 60 seconds
lane_timeouts {
query = 30000 # Query lane: 30 seconds
execute = 120000 # Execute lane: 2 minutes
}
}Priority: Lane-specific timeout > Default timeout
Complete Advanced Configuration
queue {
# Concurrency limits
control_max_concurrency = 2
query_max_concurrency = 10
execute_max_concurrency = 5
generate_max_concurrency = 1
# Basic features
enable_metrics = true
enable_dlq = true
enable_alerts = true
storage_path = "./queue_data"
default_timeout_ms = 60000
# Advanced: Retry policy
retry_policy {
strategy = "exponential"
max_retries = 3
initial_delay_ms = 100
}
# Advanced: Rate limiting
rate_limit {
limit_type = "per_second"
max_operations = 100
}
# Advanced: Priority boost
priority_boost {
strategy = "standard"
deadline_ms = 300000
}
# Advanced: Pressure monitoring
pressure_threshold = 50
# Advanced: Per-lane timeouts
lane_timeouts {
query = 30000
execute = 120000
}
}Queue Status API
The SDK exposes real-time queue introspection on every session with a queue configured.
SessionQueueStats
let stats = session.queue_stats().await;
println!("Total pending: {}", stats.total_pending);
println!("Total active: {}", stats.total_active);
// Per-lane breakdown
for (lane, lane_stats) in &stats.lanes {
println!(
" {:?}: pending={}, active={}, completed={}, failed={}",
lane, lane_stats.pending, lane_stats.active,
lane_stats.completed, lane_stats.failed,
);
}
// External tasks waiting for completion
let external = session.pending_external_tasks().await;
println!("External pending: {}", external.len());
for task in &external {
println!(
" [{}] {} in {:?} lane — {}ms remaining",
task.task_id, task.command_type, task.lane, task.remaining_ms(),
);
}const stats = await session.queueStats();
console.log(`Pending: ${stats.totalPending}, Active: ${stats.totalActive}`);
// External tasks waiting for completion
const external = await session.pendingExternalTasks();
console.log(`External pending: ${external.length}`);
for (const task of external) {
console.log(` [${task.task_id}] ${task.command_type} in ${task.lane} — ${task.remaining_ms}ms left`);
}stats = session.queue_stats()
print(f"Pending: {stats['total_pending']}, Active: {stats['total_active']}")
# External tasks waiting for completion
external = session.pending_external_tasks()
print(f"External pending: {len(external)}")
for task in external:
print(f" [{task['task_id']}] {task['command_type']} in {task['lane']} — {task['remaining_ms']}ms left")Metrics and Dead Letters
// Metrics snapshot (if metrics enabled)
if let Some(metrics) = session.queue_metrics().await {
println!("{:?}", metrics);
}
// Dead letters (if DLQ enabled)
let dead = session.dead_letters().await;
for dl in &dead {
println!("Dead: {} — {}", dl.command_type, dl.error);
}External Task Handling
Any lane can be switched to External mode, where tool calls in that lane are not executed locally — instead they become ExternalTask objects that your SDK code must poll, process (locally or on a remote machine), and complete via callback. This is the mechanism for multi-machine parallel execution.
Handler Modes
Prop
Type
ExternalTask
When a tool call enters a lane in External mode, the queue creates an ExternalTask:
Prop
Type
Methods: is_timed_out() checks if the task has expired, remaining_ms() returns remaining time.
ExternalTaskResult
Complete a task by providing:
Prop
Type
Lifecycle
Agent loop: LLM returns tool call (e.g., bash("cargo test"))
│
├─ Lane routing: "bash" → Execute lane
│
├─ Execute lane is External mode
│ │
│ ├─ 1. Create ExternalTask with task_id, payload, timeout
│ ├─ 2. Store in pending external tasks map
│ ├─ 3. Emit ExternalTaskPending event
│ ├─ 4. Block — waiting for completion or timeout
│ │
│ │ ┌─── SDK / Remote Worker ───┐
│ │ │ Poll pending_external_tasks() │
│ │ │ Extract payload, execute remotely │
│ │ │ Call complete_external_task(result) │
│ │ └────────────────────────────────────┘
│ │
│ ├─ 5. Receive result via oneshot channel
│ ├─ 6. Emit ExternalTaskCompleted event
│ └─ 7. Return result to agent loop
│
└─ Agent continues with tool resultIf the SDK doesn't call complete_external_task() within timeout_ms, the task fails with a timeout error and the agent loop handles it as a tool failure.
Multi-Machine Parallel Processing
The core pattern: run the agent session on a coordinator machine, route specific lanes to External mode, and have worker processes on remote machines poll and execute those tasks.
Architecture
┌─────────────────────────────────────────────────────┐
│ Coordinator (runs agent session) │
│ │
│ Agent Loop ──→ Lane Queue │
│ ├─ Query lane [Internal] │
│ │ └─ read, glob, grep → local │
│ ├─ Execute lane [External] │
│ │ └─ bash, write → ExternalTask │
│ └─ Generate lane [Internal] │
│ └─ LLM calls → local │
│ │
│ pending_external_tasks() ──→ task queue ──→ workers │
│ complete_external_task() ←── results ←── workers │
└──────────────────────────┬──────────────────────────┘
│ network (gRPC, HTTP, Redis, etc.)
┌────────────────┼────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Worker A │ │ Worker B │ │ Worker C │
│ (machine) │ │ (machine) │ │ (machine) │
│ bash exec │ │ bash exec │ │ bash exec │
└─────────────┘ └─────────────┘ └─────────────┘Coordinator: Setup and Event Loop
use a3s_code_core::{
Agent, AgentEvent, SessionOptions, SessionQueueConfig,
SessionLane, LaneHandlerConfig, TaskHandlerMode, ExternalTaskResult,
};
use tokio::sync::mpsc;
let agent = Agent::new("agent.hcl").await?;
// 1. Create session with queue enabled
let session = agent.session("/project", Some(
SessionOptions::new()
.with_queue_config(
SessionQueueConfig::default()
.with_lane_features()
.with_timeout(60_000)
)
))?;
// 2. Route Execute lane to external handlers
session.set_lane_handler(SessionLane::Execute, LaneHandlerConfig {
mode: TaskHandlerMode::External,
timeout_ms: 60_000,
}).await;
// 3. Stream the prompt — agent will produce ExternalTask objects for bash/write/edit
let (mut rx, _handle) = session.stream("Run the full test suite, build for release, and deploy to staging").await?;
// 4. Event loop: dispatch external tasks to workers, feed results back
while let Some(event) = rx.recv().await {
match event {
AgentEvent::ExternalTaskPending { .. } => {
// Poll all pending tasks and dispatch to remote workers
let tasks = session.pending_external_tasks().await;
for task in tasks {
let session = session.clone();
tokio::spawn(async move {
// dispatch_to_worker is your transport layer
// (gRPC, HTTP, Redis pub/sub, message queue, etc.)
let worker_result = dispatch_to_worker(&task).await;
session.complete_external_task(&task.task_id, ExternalTaskResult {
success: worker_result.success,
result: serde_json::json!({
"output": worker_result.output,
"exit_code": worker_result.exit_code,
}),
error: worker_result.error,
}).await;
});
}
}
AgentEvent::TextDelta { text } => print!("{text}"),
AgentEvent::End { .. } => break,
_ => {}
}
}const { Agent } = require('@a3s-lab/code');
const agent = await Agent.create('agent.hcl');
// 1. Create session with queue
const session = agent.session('/project', {
queueConfig: { enableAllFeatures: true, defaultTimeoutMs: 60000 },
});
// 2. Route Execute lane to external handlers
await session.setLaneHandler('execute', {
mode: 'external',
timeoutMs: 60000,
});
// 3. Stream and handle external tasks
const events = await session.stream(
'Run the full test suite, build for release, and deploy to staging'
);
for (const event of events) {
if (event.type === 'external_task_pending') {
const tasks = await session.pendingExternalTasks();
await Promise.all(tasks.map(async (task) => {
// dispatchToWorker is your transport layer
const workerResult = await dispatchToWorker(task);
await session.completeExternalTask(task.task_id, {
success: workerResult.success,
result: { output: workerResult.output, exit_code: workerResult.exitCode },
error: workerResult.error,
});
}));
}
if (event.type === 'text_delta') process.stdout.write(event.text);
if (event.type === 'end') break;
}import asyncio
from a3s_code import Agent, SessionQueueConfig
agent = Agent.create("agent.hcl")
# 1. Create session with queue
qc = SessionQueueConfig()
qc.with_lane_features()
qc.set_default_timeout(60000)
session = agent.session("/project", queue_config=qc)
# 2. Route Execute lane to external handlers
session.set_lane_handler("execute", mode="external", timeout_ms=60000)
# 3. Stream and handle external tasks
for event in session.stream("Run the full test suite, build for release, and deploy to staging"):
if event.event_type == "external_task_pending":
tasks = session.pending_external_tasks()
async def handle(task):
# dispatch_to_worker is your transport layer
worker_result = await dispatch_to_worker(task)
session.complete_external_task(task["task_id"], {
"success": worker_result.success,
"result": {"output": worker_result.output, "exit_code": worker_result.exit_code},
"error": worker_result.error,
})
asyncio.run(asyncio.gather(*[handle(t) for t in tasks]))
elif event.event_type == "text_delta":
print(event.text, end="", flush=True)
elif event.event_type == "end":
breakWorker Process (Remote Machine)
The worker is a standalone process that receives tasks via your transport layer, executes them, and returns results:
// worker.rs — runs on each remote machine
use std::process::Command;
/// Receives ExternalTask payloads, executes locally, returns results
async fn worker_loop(mut receiver: impl TaskReceiver) {
while let Some(task_envelope) = receiver.next().await {
let task = task_envelope.task; // ExternalTask fields
let result = match task.command_type.as_str() {
"bash" => {
let cmd = task.payload["command"].as_str().unwrap_or("");
let output = Command::new("sh")
.arg("-c")
.arg(cmd)
.current_dir(&task.payload["working_dir"].as_str().unwrap_or("."))
.output();
match output {
Ok(out) => WorkerResult {
success: out.status.success(),
output: String::from_utf8_lossy(&out.stdout).to_string(),
exit_code: out.status.code().unwrap_or(1),
error: if out.status.success() { None }
else { Some(String::from_utf8_lossy(&out.stderr).to_string()) },
},
Err(e) => WorkerResult {
success: false, output: String::new(), exit_code: 1,
error: Some(e.to_string()),
},
}
}
_ => WorkerResult {
success: false, output: String::new(), exit_code: 1,
error: Some(format!("Worker does not handle: {}", task.command_type)),
},
};
// Send result back to coordinator
task_envelope.respond(result).await;
}
}The transport layer between coordinator and workers is your choice — A3S Code provides the ExternalTask/ExternalTaskResult contract, you provide the plumbing (gRPC, HTTP, Redis, NATS, SQS, etc.).
Hybrid Mode: Local Execution + External Notification
Use Hybrid mode when you want local execution as the default path but also need external visibility — for monitoring, audit logging, or shadow processing on remote machines:
// Hybrid: execute locally, also notify external watchers
session.set_lane_handler(SessionLane::Execute, LaneHandlerConfig {
mode: TaskHandlerMode::Hybrid,
timeout_ms: 60_000,
}).await;
// The agent runs normally — tools execute locally as default.
// ExternalTaskPending events are still emitted, so external watchers
// can observe what's happening (e.g., for audit or metrics).
let result = session.send("Run cargo test and fix any failures").await?;// Hybrid: execute locally, also notify external watchers
await session.setLaneHandler('execute', { mode: 'hybrid', timeoutMs: 60000 });
// Tools run locally. ExternalTaskPending events emitted for observation.
const result = await session.send('Run cargo test and fix any failures');# Hybrid: execute locally, also notify external watchers
session.set_lane_handler("execute", mode="hybrid", timeout_ms=60000)
# Tools run locally. ExternalTaskPending events emitted for observation.
result = session.send("Run cargo test and fix any failures")Dynamic Lane Switching
Lanes can be switched between modes at runtime. This enables adaptive strategies — start in Internal mode, switch to External when workers are available:
// Start with local execution (default)
let result = session.send("Analyze the codebase").await?;
// Workers have come online — switch to external for heavy execution
session.set_lane_handler(SessionLane::Execute, LaneHandlerConfig {
mode: TaskHandlerMode::External,
timeout_ms: 60_000,
}).await;
// Now bash/write/edit tasks route to remote workers
let result = session.send("Run the full CI pipeline across all packages").await?;
// Workers going offline — switch back to local
session.set_lane_handler(SessionLane::Execute, LaneHandlerConfig {
mode: TaskHandlerMode::Internal,
timeout_ms: 60_000,
}).await;Planning + External Execution
When planning is enabled, each plan step goes through the same agent loop and lane queue. This means externally-handled lanes work transparently with execution plans:
let config = AgentConfig {
planning_enabled: true,
// ...
};
// Set Execute lane to External
session.set_lane_handler(SessionLane::Execute, LaneHandlerConfig {
mode: TaskHandlerMode::External,
timeout_ms: 120_000,
}).await;
// The planner decomposes the task into steps.
// Each step's tool calls route through the lane queue.
// bash/write/edit calls in each step → ExternalTask → workers
let (mut rx, _handle) = session.stream(
"Refactor the auth module to use JWT, update all tests, and verify CI passes"
).await?;
while let Some(event) = rx.recv().await {
match event {
AgentEvent::TaskUpdated { tasks, .. } => {
// Track plan progress
let done = tasks.iter().filter(|t| t.status == TaskStatus::Completed).count();
println!("Plan progress: {}/{} steps", done, tasks.len());
}
AgentEvent::ExternalTaskPending { .. } => {
// Dispatch to workers — same pattern as above
for task in session.pending_external_tasks().await {
let session = session.clone();
tokio::spawn(async move {
let result = dispatch_to_worker(&task).await;
session.complete_external_task(&task.task_id, result.into()).await;
});
}
}
AgentEvent::End { .. } => break,
_ => {}
}
}Configuration Reference
pub struct SessionQueueConfig {
// Concurrency limits
pub control_max_concurrency: usize, // Default: 2
pub query_max_concurrency: usize, // Default: 4
pub execute_max_concurrency: usize, // Default: 2
pub generate_max_concurrency: usize, // Default: 1
// Lane handlers
pub lane_handlers: HashMap<SessionLane, LaneHandlerConfig>, // Per-lane mode config
// Basic features
pub enable_dlq: bool, // Default: false
pub dlq_max_size: Option<usize>, // Default: 1000 (when enabled)
pub enable_metrics: bool, // Default: false
pub enable_alerts: bool, // Default: false
pub default_timeout_ms: Option<u64>, // Default: 60000 (when enabled)
pub storage_path: Option<PathBuf>, // Default: None (in-memory only)
// Advanced features (A3S Lane v0.4.0)
pub retry_policy: Option<RetryPolicyConfig>,
pub rate_limit: Option<RateLimitConfig>,
pub priority_boost: Option<PriorityBoostConfig>,
pub pressure_threshold: Option<usize>,
pub lane_timeouts: HashMap<SessionLane, u64>,
}
pub struct RetryPolicyConfig {
pub strategy: String, // "exponential", "fixed", "none"
pub max_retries: u32,
pub initial_delay_ms: u64,
pub fixed_delay_ms: Option<u64>, // For "fixed" strategy
}
pub struct RateLimitConfig {
pub limit_type: String, // "per_second", "per_minute", "per_hour", "unlimited"
pub max_operations: Option<u64>,
}
pub struct PriorityBoostConfig {
pub strategy: String, // "standard", "aggressive", "disabled"
pub deadline_ms: Option<u64>,
}
pub struct LaneHandlerConfig {
pub mode: TaskHandlerMode, // Internal | External | Hybrid
pub timeout_ms: u64, // Default: 60000
}Lane handlers can also be pre-configured in SessionQueueConfig instead of calling set_lane_handler() at runtime:
use std::collections::HashMap;
let mut lane_handlers = HashMap::new();
lane_handlers.insert(SessionLane::Execute, LaneHandlerConfig {
mode: TaskHandlerMode::External,
timeout_ms: 60_000,
});
let config = SessionQueueConfig {
lane_handlers,
enable_dlq: true,
enable_metrics: true,
..Default::default()
};
let session = agent.session("/project", Some(
SessionOptions::new().with_queue_config(config)
))?;Backward Compatibility
When no queue_config is set in SessionOptions, the queue is not created and all tools execute sequentially via direct ToolExecutor calls, exactly as before. The queue is purely opt-in.
API Reference
SessionQueueConfig fields
Prop
Type
LaneHandlerConfig fields
Prop
Type
RetryPolicy fields
Prop
Type