Lanes
Priority-based lanes with configurable concurrency, timeout, retry, rate limiting, pressure tracking, and priority boosting
Lanes
A lane is a priority queue that holds pending commands and enforces concurrency limits. Each lane has a unique ID, a priority level, and a configuration that controls timeout, retry, rate limiting, priority boosting, and pressure tracking.
Built-in Lanes
Call with_default_lanes() on the builder to register the 6 standard lanes:
let manager = QueueManagerBuilder::new(emitter)
.with_default_lanes()
.build()
.await?;Prop
Type
Custom Lanes
Register lanes with custom priorities and configuration:
let manager = QueueManagerBuilder::new(emitter)
.with_lane("critical", LaneConfig::new(2, 8), 0)
.with_lane("normal", LaneConfig::new(1, 16), 1)
.with_lane("background", LaneConfig::new(1, 4), 2)
.build()
.await?;Mix built-in and custom lanes:
let manager = QueueManagerBuilder::new(emitter)
.with_default_lanes()
.with_lane("batch", LaneConfig::new(1, 20), 6)
.build()
.await?;from a3s_lane import Lane, LaneConfig
# Default lanes
lane = Lane()
lane.start()
# Custom lanes
lane = Lane.with_lanes([
LaneConfig("critical", priority=0, min_concurrency=2, max_concurrency=8),
LaneConfig("normal", priority=1, min_concurrency=1, max_concurrency=16),
LaneConfig("background", priority=2, min_concurrency=1, max_concurrency=4),
])
lane.start()LaneConfig fields: lane_id, priority, min_concurrency, max_concurrency, timeout_secs (optional), pressure_threshold (optional).
const { Lane } = require('@a3s-lab/lane');
// Default lanes
const lane = new Lane();
lane.start();
// Custom lanes
const lane = Lane.withLanes([
{ laneId: 'critical', priority: 0, minConcurrency: 2, maxConcurrency: 8 },
{ laneId: 'normal', priority: 1, minConcurrency: 1, maxConcurrency: 16 },
{ laneId: 'background', priority: 2, minConcurrency: 1, maxConcurrency: 4 },
]);
lane.start();Config fields: laneId, priority, minConcurrency, maxConcurrency, timeoutSecs (optional), pressureThreshold (optional).
LaneConfig (Rust)
LaneConfig controls all per-lane behavior. Create with new(min, max) and chain builder methods:
use std::time::Duration;
let config = LaneConfig::new(2, 10)
.with_timeout(Duration::from_secs(30))
.with_retry_policy(RetryPolicy::exponential(3))
.with_pressure_threshold(50) // emit queue.lane.pressure / queue.lane.idle
.with_rate_limit(RateLimitConfig::per_second(100)) // requires `distributed` feature
.with_priority_boost(PriorityBoostConfig::standard( // requires `distributed` feature
Duration::from_secs(60),
));Concurrency
Prop
Type
The semaphore ensures no more than max_concurrency commands from a lane execute simultaneously.
Timeout
let config = LaneConfig::new(1, 10)
.with_timeout(Duration::from_secs(30));Commands exceeding the timeout receive LaneError::Timeout. If retries are configured, the command is re-enqueued; otherwise it moves to the DLQ (if enabled) or the error is returned to the caller.
Retry Policy
// Exponential backoff: 100ms → 200ms → 400ms (cap 30s)
LaneConfig::new(1, 10).with_retry_policy(RetryPolicy::exponential(3))
// Fixed delay: 1s between each retry
LaneConfig::new(1, 10).with_retry_policy(RetryPolicy::fixed(5, Duration::from_secs(1)))
// No retries (default)
LaneConfig::new(1, 10).with_retry_policy(RetryPolicy::none())Prop
Type
Rate Limiting
LaneConfig::new(1, 10).with_rate_limit(RateLimitConfig::per_second(100))
LaneConfig::new(1, 10).with_rate_limit(RateLimitConfig::per_minute(1000))
LaneConfig::new(1, 10).with_rate_limit(RateLimitConfig::per_hour(10_000))Requires the distributed feature (on by default). Rate is enforced at dequeue time — commands are held until a token is available.
Priority Boosting
// Standard: boost at 75%, 50%, 25% of deadline remaining
LaneConfig::new(1, 10).with_priority_boost(PriorityBoostConfig::standard(Duration::from_secs(60)))
// Aggressive: more frequent escalation
LaneConfig::new(1, 10).with_priority_boost(PriorityBoostConfig::aggressive(Duration::from_secs(30)))
// Disabled
LaneConfig::new(1, 10).with_priority_boost(PriorityBoostConfig::disabled())Requires the distributed feature.
Pressure Tracking
Emit queue.lane.pressure when the pending queue crosses a threshold, and queue.lane.idle when it drains back to zero:
LaneConfig::new(1, 10).with_pressure_threshold(50)queue.lane.pressurefires on the first crossing from below threshold topending >= 50queue.lane.idlefires whenpendingreturns to 0 after being pressured- No threshold set → no pressure events emitted
See Events → Lane Pressure for the full event spec.
Lane Status
Check the current state of each lane:
let stats = manager.stats().await?;
for (lane_id, status) in &stats.lanes {
println!("{}: {} pending, {} active (max {})",
lane_id, status.pending, status.active, status.max);
}Prop
Type
Scheduling Behavior
The scheduler runs a 10ms background loop:
- Sort lanes by priority (lowest number = highest priority)
- Skip lanes at max concurrency
- Dequeue and spawn the next command from the highest-priority lane with work
- Repeat until no lanes have pending commands
Higher-priority lanes are always serviced first. Within a lane, commands execute in FIFO order.