A3S Docs
A3S Lane

Lanes

Priority-based lanes with configurable concurrency, timeout, retry, rate limiting, pressure tracking, and priority boosting

Lanes

A lane is a priority queue that holds pending commands and enforces concurrency limits. Each lane has a unique ID, a priority level, and a configuration that controls timeout, retry, rate limiting, priority boosting, and pressure tracking.

Built-in Lanes

Call with_default_lanes() on the builder to register the 6 standard lanes:

let manager = QueueManagerBuilder::new(emitter)
    .with_default_lanes()
    .build()
    .await?;

Prop

Type

Custom Lanes

Register lanes with custom priorities and configuration:

let manager = QueueManagerBuilder::new(emitter)
    .with_lane("critical",   LaneConfig::new(2, 8),  0)
    .with_lane("normal",     LaneConfig::new(1, 16), 1)
    .with_lane("background", LaneConfig::new(1, 4),  2)
    .build()
    .await?;

Mix built-in and custom lanes:

let manager = QueueManagerBuilder::new(emitter)
    .with_default_lanes()
    .with_lane("batch", LaneConfig::new(1, 20), 6)
    .build()
    .await?;
from a3s_lane import Lane, LaneConfig

# Default lanes
lane = Lane()
lane.start()

# Custom lanes
lane = Lane.with_lanes([
    LaneConfig("critical",   priority=0, min_concurrency=2, max_concurrency=8),
    LaneConfig("normal",     priority=1, min_concurrency=1, max_concurrency=16),
    LaneConfig("background", priority=2, min_concurrency=1, max_concurrency=4),
])
lane.start()

LaneConfig fields: lane_id, priority, min_concurrency, max_concurrency, timeout_secs (optional), pressure_threshold (optional).

const { Lane } = require('@a3s-lab/lane');

// Default lanes
const lane = new Lane();
lane.start();

// Custom lanes
const lane = Lane.withLanes([
  { laneId: 'critical',   priority: 0, minConcurrency: 2, maxConcurrency: 8  },
  { laneId: 'normal',     priority: 1, minConcurrency: 1, maxConcurrency: 16 },
  { laneId: 'background', priority: 2, minConcurrency: 1, maxConcurrency: 4  },
]);
lane.start();

Config fields: laneId, priority, minConcurrency, maxConcurrency, timeoutSecs (optional), pressureThreshold (optional).

LaneConfig (Rust)

LaneConfig controls all per-lane behavior. Create with new(min, max) and chain builder methods:

use std::time::Duration;

let config = LaneConfig::new(2, 10)
    .with_timeout(Duration::from_secs(30))
    .with_retry_policy(RetryPolicy::exponential(3))
    .with_pressure_threshold(50)                        // emit queue.lane.pressure / queue.lane.idle
    .with_rate_limit(RateLimitConfig::per_second(100))  // requires `distributed` feature
    .with_priority_boost(PriorityBoostConfig::standard( // requires `distributed` feature
        Duration::from_secs(60),
    ));

Concurrency

Prop

Type

The semaphore ensures no more than max_concurrency commands from a lane execute simultaneously.

Timeout

let config = LaneConfig::new(1, 10)
    .with_timeout(Duration::from_secs(30));

Commands exceeding the timeout receive LaneError::Timeout. If retries are configured, the command is re-enqueued; otherwise it moves to the DLQ (if enabled) or the error is returned to the caller.

Retry Policy

// Exponential backoff: 100ms → 200ms → 400ms (cap 30s)
LaneConfig::new(1, 10).with_retry_policy(RetryPolicy::exponential(3))

// Fixed delay: 1s between each retry
LaneConfig::new(1, 10).with_retry_policy(RetryPolicy::fixed(5, Duration::from_secs(1)))

// No retries (default)
LaneConfig::new(1, 10).with_retry_policy(RetryPolicy::none())

Prop

Type

Rate Limiting

LaneConfig::new(1, 10).with_rate_limit(RateLimitConfig::per_second(100))
LaneConfig::new(1, 10).with_rate_limit(RateLimitConfig::per_minute(1000))
LaneConfig::new(1, 10).with_rate_limit(RateLimitConfig::per_hour(10_000))

Requires the distributed feature (on by default). Rate is enforced at dequeue time — commands are held until a token is available.

Priority Boosting

// Standard: boost at 75%, 50%, 25% of deadline remaining
LaneConfig::new(1, 10).with_priority_boost(PriorityBoostConfig::standard(Duration::from_secs(60)))

// Aggressive: more frequent escalation
LaneConfig::new(1, 10).with_priority_boost(PriorityBoostConfig::aggressive(Duration::from_secs(30)))

// Disabled
LaneConfig::new(1, 10).with_priority_boost(PriorityBoostConfig::disabled())

Requires the distributed feature.

Pressure Tracking

Emit queue.lane.pressure when the pending queue crosses a threshold, and queue.lane.idle when it drains back to zero:

LaneConfig::new(1, 10).with_pressure_threshold(50)
  • queue.lane.pressure fires on the first crossing from below threshold to pending >= 50
  • queue.lane.idle fires when pending returns to 0 after being pressured
  • No threshold set → no pressure events emitted

See Events → Lane Pressure for the full event spec.

Lane Status

Check the current state of each lane:

let stats = manager.stats().await?;
for (lane_id, status) in &stats.lanes {
    println!("{}: {} pending, {} active (max {})",
        lane_id, status.pending, status.active, status.max);
}

Prop

Type

Scheduling Behavior

The scheduler runs a 10ms background loop:

  1. Sort lanes by priority (lowest number = highest priority)
  2. Skip lanes at max concurrency
  3. Dequeue and spawn the next command from the highest-priority lane with work
  4. Repeat until no lanes have pending commands

Higher-priority lanes are always serviced first. Within a lane, commands execute in FIFO order.

On this page