A3S Docs
A3S Code

Lane Queue

Priority-based tool execution with parallel reads and multi-machine external processing

Lane Queue

The Lane Queue system routes tool execution through a priority-based queue backed by a3s-lane v0.4.0. When enabled, read-only tools execute in parallel while write tools maintain sequential ordering.

Fully integrated with A3S Lane v0.4.0 — includes retry policies, rate limiting, priority boost, pressure monitoring, and per-lane timeouts.

How It Works

Each tool call is mapped to a lane based on its type:

Prop

Type

Higher-priority lanes are scheduled before lower-priority lanes — a Query (P1) task submitted after Execute (P2) tasks will be scheduled ahead of queued Execute tasks. See Lane-Based Priority Preemption Examples for multi-language SDK demos.

When the LLM returns multiple tool calls in a single turn:

  1. Query-lane tools are submitted to the queue and executed in parallel (up to query_max_concurrency, default 4)
  2. All other tools execute sequentially in order, preserving side-effect ordering
  3. All pre-execution checks (permissions, HITL, hooks, skill filters) run before queue submission

Enabling the Queue

Rust

use a3s_code_core::{Agent, SessionOptions, SessionQueueConfig};

let session = agent.session("/project", Some(
    SessionOptions::new()
        .with_queue_config(SessionQueueConfig::default().with_lane_features())
))?;

Python

from a3s_code import Agent, SessionQueueConfig

qc = SessionQueueConfig()
qc.with_lane_features()
session = agent.session("/project", queue_config=qc)

TypeScript

const session = agent.session('/project', {
  queueConfig: { enableAllFeatures: true },
});

Queue Features

Dead Letter Queue (DLQ)

Commands that fail after exhausting retry attempts (3 by default, exponential backoff) are moved to the DLQ. Query dead letters via session.dead_letters().

Metrics

When enabled, the queue collects per-lane counters, gauges, and histograms (latency percentiles). Access via session.queue_metrics().

Alerts

Queue depth alerts fire when pending commands exceed thresholds (50 warning, 100 critical). These emit AgentEvent::QueueAlert events.

Advanced Features (A3S Lane v0.4.0)

1. Retry Policy

Automatically retry failed tasks with configurable strategies.

Configuration:

queue {
  retry_policy {
    strategy = "exponential"  # exponential, fixed, or none
    max_retries = 3
    initial_delay_ms = 100
  }
}

Strategies:

  • exponential — Exponential backoff (100ms → 200ms → 400ms)
  • fixed — Fixed delay between retries (requires fixed_delay_ms)
  • none — No automatic retries

Example (Fixed Delay):

queue {
  retry_policy {
    strategy = "fixed"
    max_retries = 5
    fixed_delay_ms = 1000  # 1 second between retries
  }
}

2. Rate Limiting

Control the maximum number of operations per time window.

Configuration:

queue {
  rate_limit {
    limit_type = "per_second"  # per_second, per_minute, per_hour, unlimited
    max_operations = 100
  }
}

Limit Types:

  • per_second — Maximum operations per second
  • per_minute — Maximum operations per minute
  • per_hour — Maximum operations per hour
  • unlimited — No rate limiting

Rate limiting requires the distributed feature in a3s-lane.

3. Priority Boost

Automatically boost task priority as deadlines approach.

Configuration:

queue {
  priority_boost {
    strategy = "standard"  # standard, aggressive, or disabled
    deadline_ms = 300000   # 5 minutes
  }
}

Strategies:

  • standard — Boost at 75%, 50%, 25% remaining time
  • aggressive — Boost earlier and more frequently
  • disabled — No priority boost

Priority boost requires the distributed feature in a3s-lane.

How It Works:

Task submitted with deadline_ms = 300000 (5 minutes)

Time remaining: 225s (75%) → Priority +1
Time remaining: 150s (50%) → Priority +2
Time remaining: 75s (25%)  → Priority +3

4. Pressure Monitoring

Emit events when queue pressure exceeds threshold.

Configuration:

queue {
  pressure_threshold = 50  # Emit event when pending tasks >= 50
}

Events:

  • queue.lane.pressure — Emitted when pending tasks >= threshold
  • queue.lane.idle — Emitted when queue becomes empty

Use Cases:

  • Auto-scaling triggers
  • Alert notifications
  • Performance monitoring

5. Per-Lane Timeouts

Override default timeout for specific lanes.

Configuration:

queue {
  default_timeout_ms = 60000  # Global default: 60 seconds

  lane_timeouts {
    query = 30000      # Query lane: 30 seconds
    execute = 120000   # Execute lane: 2 minutes
  }
}

Priority: Lane-specific timeout > Default timeout

Complete Advanced Configuration

queue {
  # Concurrency limits
  control_max_concurrency = 2
  query_max_concurrency = 10
  execute_max_concurrency = 5
  generate_max_concurrency = 1

  # Basic features
  enable_metrics = true
  enable_dlq = true
  enable_alerts = true
  storage_path = "./queue_data"
  default_timeout_ms = 60000

  # Advanced: Retry policy
  retry_policy {
    strategy = "exponential"
    max_retries = 3
    initial_delay_ms = 100
  }

  # Advanced: Rate limiting
  rate_limit {
    limit_type = "per_second"
    max_operations = 100
  }

  # Advanced: Priority boost
  priority_boost {
    strategy = "standard"
    deadline_ms = 300000
  }

  # Advanced: Pressure monitoring
  pressure_threshold = 50

  # Advanced: Per-lane timeouts
  lane_timeouts {
    query = 30000
    execute = 120000
  }
}

Queue Status API

The SDK exposes real-time queue introspection on every session with a queue configured.

SessionQueueStats

let stats = session.queue_stats().await;

println!("Total pending: {}", stats.total_pending);
println!("Total active:  {}", stats.total_active);

// Per-lane breakdown
for (lane, lane_stats) in &stats.lanes {
    println!(
        "  {:?}: pending={}, active={}, completed={}, failed={}",
        lane, lane_stats.pending, lane_stats.active,
        lane_stats.completed, lane_stats.failed,
    );
}

// External tasks waiting for completion
let external = session.pending_external_tasks().await;
println!("External pending: {}", external.len());
for task in &external {
    println!(
        "  [{}] {} in {:?} lane — {}ms remaining",
        task.task_id, task.command_type, task.lane, task.remaining_ms(),
    );
}
const stats = await session.queueStats();
console.log(`Pending: ${stats.totalPending}, Active: ${stats.totalActive}`);

// External tasks waiting for completion
const external = await session.pendingExternalTasks();
console.log(`External pending: ${external.length}`);
for (const task of external) {
  console.log(`  [${task.task_id}] ${task.command_type} in ${task.lane}${task.remaining_ms}ms left`);
}
stats = session.queue_stats()
print(f"Pending: {stats['total_pending']}, Active: {stats['total_active']}")

# External tasks waiting for completion
external = session.pending_external_tasks()
print(f"External pending: {len(external)}")
for task in external:
    print(f"  [{task['task_id']}] {task['command_type']} in {task['lane']}{task['remaining_ms']}ms left")

Metrics and Dead Letters

// Metrics snapshot (if metrics enabled)
if let Some(metrics) = session.queue_metrics().await {
    println!("{:?}", metrics);
}

// Dead letters (if DLQ enabled)
let dead = session.dead_letters().await;
for dl in &dead {
    println!("Dead: {} — {}", dl.command_type, dl.error);
}

External Task Handling

Any lane can be switched to External mode, where tool calls in that lane are not executed locally — instead they become ExternalTask objects that your SDK code must poll, process (locally or on a remote machine), and complete via callback. This is the mechanism for multi-machine parallel execution.

Handler Modes

Prop

Type

ExternalTask

When a tool call enters a lane in External mode, the queue creates an ExternalTask:

Prop

Type

Methods: is_timed_out() checks if the task has expired, remaining_ms() returns remaining time.

ExternalTaskResult

Complete a task by providing:

Prop

Type

Lifecycle

Agent loop: LLM returns tool call (e.g., bash("cargo test"))

    ├─ Lane routing: "bash" → Execute lane

    ├─ Execute lane is External mode
    │   │
    │   ├─ 1. Create ExternalTask with task_id, payload, timeout
    │   ├─ 2. Store in pending external tasks map
    │   ├─ 3. Emit ExternalTaskPending event
    │   ├─ 4. Block — waiting for completion or timeout
    │   │
    │   │    ┌─── SDK / Remote Worker ───┐
    │   │    │ Poll pending_external_tasks()      │
    │   │    │ Extract payload, execute remotely   │
    │   │    │ Call complete_external_task(result)  │
    │   │    └────────────────────────────────────┘
    │   │
    │   ├─ 5. Receive result via oneshot channel
    │   ├─ 6. Emit ExternalTaskCompleted event
    │   └─ 7. Return result to agent loop

    └─ Agent continues with tool result

If the SDK doesn't call complete_external_task() within timeout_ms, the task fails with a timeout error and the agent loop handles it as a tool failure.

Multi-Machine Parallel Processing

The core pattern: run the agent session on a coordinator machine, route specific lanes to External mode, and have worker processes on remote machines poll and execute those tasks.

Architecture

┌─────────────────────────────────────────────────────┐
│ Coordinator (runs agent session)                     │
│                                                      │
│  Agent Loop ──→ Lane Queue                           │
│                   ├─ Query lane  [Internal]           │
│                   │   └─ read, glob, grep → local     │
│                   ├─ Execute lane [External]           │
│                   │   └─ bash, write → ExternalTask   │
│                   └─ Generate lane [Internal]          │
│                       └─ LLM calls → local            │
│                                                      │
│  pending_external_tasks() ──→ task queue ──→ workers  │
│  complete_external_task()  ←── results ←── workers    │
└──────────────────────────┬──────────────────────────┘
                           │ network (gRPC, HTTP, Redis, etc.)
          ┌────────────────┼────────────────┐
          ▼                ▼                ▼
   ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
   │  Worker A   │ │  Worker B   │ │  Worker C   │
   │  (machine)  │ │  (machine)  │ │  (machine)  │
   │  bash exec  │ │  bash exec  │ │  bash exec  │
   └─────────────┘ └─────────────┘ └─────────────┘

Coordinator: Setup and Event Loop

use a3s_code_core::{
    Agent, AgentEvent, SessionOptions, SessionQueueConfig,
    SessionLane, LaneHandlerConfig, TaskHandlerMode, ExternalTaskResult,
};
use tokio::sync::mpsc;

let agent = Agent::new("agent.hcl").await?;

// 1. Create session with queue enabled
let session = agent.session("/project", Some(
    SessionOptions::new()
        .with_queue_config(
            SessionQueueConfig::default()
                .with_lane_features()
                .with_timeout(60_000)
        )
))?;

// 2. Route Execute lane to external handlers
session.set_lane_handler(SessionLane::Execute, LaneHandlerConfig {
    mode: TaskHandlerMode::External,
    timeout_ms: 60_000,
}).await;

// 3. Stream the prompt — agent will produce ExternalTask objects for bash/write/edit
let (mut rx, _handle) = session.stream("Run the full test suite, build for release, and deploy to staging").await?;

// 4. Event loop: dispatch external tasks to workers, feed results back
while let Some(event) = rx.recv().await {
    match event {
        AgentEvent::ExternalTaskPending { .. } => {
            // Poll all pending tasks and dispatch to remote workers
            let tasks = session.pending_external_tasks().await;
            for task in tasks {
                let session = session.clone();
                tokio::spawn(async move {
                    // dispatch_to_worker is your transport layer
                    // (gRPC, HTTP, Redis pub/sub, message queue, etc.)
                    let worker_result = dispatch_to_worker(&task).await;

                    session.complete_external_task(&task.task_id, ExternalTaskResult {
                        success: worker_result.success,
                        result: serde_json::json!({
                            "output": worker_result.output,
                            "exit_code": worker_result.exit_code,
                        }),
                        error: worker_result.error,
                    }).await;
                });
            }
        }
        AgentEvent::TextDelta { text } => print!("{text}"),
        AgentEvent::End { .. } => break,
        _ => {}
    }
}
const { Agent } = require('@a3s-lab/code');

const agent = await Agent.create('agent.hcl');

// 1. Create session with queue
const session = agent.session('/project', {
  queueConfig: { enableAllFeatures: true, defaultTimeoutMs: 60000 },
});

// 2. Route Execute lane to external handlers
await session.setLaneHandler('execute', {
  mode: 'external',
  timeoutMs: 60000,
});

// 3. Stream and handle external tasks
const events = await session.stream(
  'Run the full test suite, build for release, and deploy to staging'
);
for (const event of events) {
  if (event.type === 'external_task_pending') {
    const tasks = await session.pendingExternalTasks();
    await Promise.all(tasks.map(async (task) => {
      // dispatchToWorker is your transport layer
      const workerResult = await dispatchToWorker(task);

      await session.completeExternalTask(task.task_id, {
        success: workerResult.success,
        result: { output: workerResult.output, exit_code: workerResult.exitCode },
        error: workerResult.error,
      });
    }));
  }
  if (event.type === 'text_delta') process.stdout.write(event.text);
  if (event.type === 'end') break;
}
import asyncio
from a3s_code import Agent, SessionQueueConfig

agent = Agent.create("agent.hcl")

# 1. Create session with queue
qc = SessionQueueConfig()
qc.with_lane_features()
qc.set_default_timeout(60000)
session = agent.session("/project", queue_config=qc)

# 2. Route Execute lane to external handlers
session.set_lane_handler("execute", mode="external", timeout_ms=60000)

# 3. Stream and handle external tasks
for event in session.stream("Run the full test suite, build for release, and deploy to staging"):
    if event.event_type == "external_task_pending":
        tasks = session.pending_external_tasks()

        async def handle(task):
            # dispatch_to_worker is your transport layer
            worker_result = await dispatch_to_worker(task)
            session.complete_external_task(task["task_id"], {
                "success": worker_result.success,
                "result": {"output": worker_result.output, "exit_code": worker_result.exit_code},
                "error": worker_result.error,
            })

        asyncio.run(asyncio.gather(*[handle(t) for t in tasks]))
    elif event.event_type == "text_delta":
        print(event.text, end="", flush=True)
    elif event.event_type == "end":
        break

Worker Process (Remote Machine)

The worker is a standalone process that receives tasks via your transport layer, executes them, and returns results:

// worker.rs — runs on each remote machine
use std::process::Command;

/// Receives ExternalTask payloads, executes locally, returns results
async fn worker_loop(mut receiver: impl TaskReceiver) {
    while let Some(task_envelope) = receiver.next().await {
        let task = task_envelope.task; // ExternalTask fields

        let result = match task.command_type.as_str() {
            "bash" => {
                let cmd = task.payload["command"].as_str().unwrap_or("");
                let output = Command::new("sh")
                    .arg("-c")
                    .arg(cmd)
                    .current_dir(&task.payload["working_dir"].as_str().unwrap_or("."))
                    .output();
                match output {
                    Ok(out) => WorkerResult {
                        success: out.status.success(),
                        output: String::from_utf8_lossy(&out.stdout).to_string(),
                        exit_code: out.status.code().unwrap_or(1),
                        error: if out.status.success() { None }
                               else { Some(String::from_utf8_lossy(&out.stderr).to_string()) },
                    },
                    Err(e) => WorkerResult {
                        success: false, output: String::new(), exit_code: 1,
                        error: Some(e.to_string()),
                    },
                }
            }
            _ => WorkerResult {
                success: false, output: String::new(), exit_code: 1,
                error: Some(format!("Worker does not handle: {}", task.command_type)),
            },
        };

        // Send result back to coordinator
        task_envelope.respond(result).await;
    }
}

The transport layer between coordinator and workers is your choice — A3S Code provides the ExternalTask/ExternalTaskResult contract, you provide the plumbing (gRPC, HTTP, Redis, NATS, SQS, etc.).

Hybrid Mode: Local Execution + External Notification

Use Hybrid mode when you want local execution as the default path but also need external visibility — for monitoring, audit logging, or shadow processing on remote machines:

// Hybrid: execute locally, also notify external watchers
session.set_lane_handler(SessionLane::Execute, LaneHandlerConfig {
    mode: TaskHandlerMode::Hybrid,
    timeout_ms: 60_000,
}).await;

// The agent runs normally — tools execute locally as default.
// ExternalTaskPending events are still emitted, so external watchers
// can observe what's happening (e.g., for audit or metrics).
let result = session.send("Run cargo test and fix any failures").await?;
// Hybrid: execute locally, also notify external watchers
await session.setLaneHandler('execute', { mode: 'hybrid', timeoutMs: 60000 });

// Tools run locally. ExternalTaskPending events emitted for observation.
const result = await session.send('Run cargo test and fix any failures');
# Hybrid: execute locally, also notify external watchers
session.set_lane_handler("execute", mode="hybrid", timeout_ms=60000)

# Tools run locally. ExternalTaskPending events emitted for observation.
result = session.send("Run cargo test and fix any failures")

Dynamic Lane Switching

Lanes can be switched between modes at runtime. This enables adaptive strategies — start in Internal mode, switch to External when workers are available:

// Start with local execution (default)
let result = session.send("Analyze the codebase").await?;

// Workers have come online — switch to external for heavy execution
session.set_lane_handler(SessionLane::Execute, LaneHandlerConfig {
    mode: TaskHandlerMode::External,
    timeout_ms: 60_000,
}).await;

// Now bash/write/edit tasks route to remote workers
let result = session.send("Run the full CI pipeline across all packages").await?;

// Workers going offline — switch back to local
session.set_lane_handler(SessionLane::Execute, LaneHandlerConfig {
    mode: TaskHandlerMode::Internal,
    timeout_ms: 60_000,
}).await;

Planning + External Execution

When planning is enabled, each plan step goes through the same agent loop and lane queue. This means externally-handled lanes work transparently with execution plans:

let config = AgentConfig {
    planning_enabled: true,
    // ...
};

// Set Execute lane to External
session.set_lane_handler(SessionLane::Execute, LaneHandlerConfig {
    mode: TaskHandlerMode::External,
    timeout_ms: 120_000,
}).await;

// The planner decomposes the task into steps.
// Each step's tool calls route through the lane queue.
// bash/write/edit calls in each step → ExternalTask → workers
let (mut rx, _handle) = session.stream(
    "Refactor the auth module to use JWT, update all tests, and verify CI passes"
).await?;

while let Some(event) = rx.recv().await {
    match event {
        AgentEvent::TaskUpdated { tasks, .. } => {
            // Track plan progress
            let done = tasks.iter().filter(|t| t.status == TaskStatus::Completed).count();
            println!("Plan progress: {}/{} steps", done, tasks.len());
        }
        AgentEvent::ExternalTaskPending { .. } => {
            // Dispatch to workers — same pattern as above
            for task in session.pending_external_tasks().await {
                let session = session.clone();
                tokio::spawn(async move {
                    let result = dispatch_to_worker(&task).await;
                    session.complete_external_task(&task.task_id, result.into()).await;
                });
            }
        }
        AgentEvent::End { .. } => break,
        _ => {}
    }
}

Configuration Reference

pub struct SessionQueueConfig {
    // Concurrency limits
    pub control_max_concurrency: usize,    // Default: 2
    pub query_max_concurrency: usize,      // Default: 4
    pub execute_max_concurrency: usize,    // Default: 2
    pub generate_max_concurrency: usize,   // Default: 1

    // Lane handlers
    pub lane_handlers: HashMap<SessionLane, LaneHandlerConfig>,  // Per-lane mode config

    // Basic features
    pub enable_dlq: bool,                  // Default: false
    pub dlq_max_size: Option<usize>,       // Default: 1000 (when enabled)
    pub enable_metrics: bool,              // Default: false
    pub enable_alerts: bool,               // Default: false
    pub default_timeout_ms: Option<u64>,   // Default: 60000 (when enabled)
    pub storage_path: Option<PathBuf>,     // Default: None (in-memory only)

    // Advanced features (A3S Lane v0.4.0)
    pub retry_policy: Option<RetryPolicyConfig>,
    pub rate_limit: Option<RateLimitConfig>,
    pub priority_boost: Option<PriorityBoostConfig>,
    pub pressure_threshold: Option<usize>,
    pub lane_timeouts: HashMap<SessionLane, u64>,
}

pub struct RetryPolicyConfig {
    pub strategy: String,        // "exponential", "fixed", "none"
    pub max_retries: u32,
    pub initial_delay_ms: u64,
    pub fixed_delay_ms: Option<u64>,  // For "fixed" strategy
}

pub struct RateLimitConfig {
    pub limit_type: String,      // "per_second", "per_minute", "per_hour", "unlimited"
    pub max_operations: Option<u64>,
}

pub struct PriorityBoostConfig {
    pub strategy: String,        // "standard", "aggressive", "disabled"
    pub deadline_ms: Option<u64>,
}

pub struct LaneHandlerConfig {
    pub mode: TaskHandlerMode,  // Internal | External | Hybrid
    pub timeout_ms: u64,        // Default: 60000
}

Lane handlers can also be pre-configured in SessionQueueConfig instead of calling set_lane_handler() at runtime:

use std::collections::HashMap;

let mut lane_handlers = HashMap::new();
lane_handlers.insert(SessionLane::Execute, LaneHandlerConfig {
    mode: TaskHandlerMode::External,
    timeout_ms: 60_000,
});

let config = SessionQueueConfig {
    lane_handlers,
    enable_dlq: true,
    enable_metrics: true,
    ..Default::default()
};

let session = agent.session("/project", Some(
    SessionOptions::new().with_queue_config(config)
))?;

Backward Compatibility

When no queue_config is set in SessionOptions, the queue is not created and all tools execute sequentially via direct ToolExecutor calls, exactly as before. The queue is purely opt-in.

API Reference

SessionQueueConfig fields

Prop

Type

LaneHandlerConfig fields

Prop

Type

RetryPolicy fields

Prop

Type

On this page