Scalability
Rate limiting, priority boosting, queue partitioning, and distributed queues
Scalability
A3S Lane includes features for scaling from a single-core prototype to multi-core and multi-machine deployments: rate limiting, priority boosting, queue partitioning, and a distributed queue interface.
Rate Limiting
Control command throughput per lane using token bucket or sliding window algorithms.
Token Bucket
// 100 commands per second
let config = LaneConfig::new(1, 10)
.with_rate_limit(RateLimitConfig::per_second(100));
// 1000 commands per minute
let config = LaneConfig::new(1, 10)
.with_rate_limit(RateLimitConfig::per_minute(1000));
// 10000 commands per hour
let config = LaneConfig::new(1, 10)
.with_rate_limit(RateLimitConfig::per_hour(10000));Token bucket provides smooth rate limiting - tokens are replenished at a constant rate and each command consumes one token. If no tokens are available, the command waits.
Priority Boosting
Prevent starvation of low-priority commands by automatically escalating their priority as a deadline approaches.
Standard Boosting
Boosts at 25%, 50%, 75% of the deadline:
let config = LaneConfig::new(1, 10)
.with_priority_boost(PriorityBoostConfig::standard(Duration::from_secs(60)));For a 60-second deadline:
- At 15s: first boost
- At 30s: second boost
- At 45s: third boost (near highest priority)
Aggressive Boosting
More frequent escalation for time-sensitive workloads:
let config = LaneConfig::new(1, 10)
.with_priority_boost(PriorityBoostConfig::aggressive(Duration::from_secs(30)));Custom Boost Intervals
Define exact boost points:
let config = LaneConfig::new(1, 10)
.with_priority_boost(PriorityBoostConfig::custom(
Duration::from_secs(120),
vec![
Duration::from_secs(30), // First boost at 30s
Duration::from_secs(60), // Second boost at 60s
Duration::from_secs(90), // Third boost at 90s
],
));Queue Partitioning
Distribute commands across multiple partitions for multi-core parallelism.
Partition Strategies
use a3s_lane::PartitionConfig;
// Automatic: one partition per CPU core
let config = PartitionConfig::auto();
// Round-robin: distribute commands evenly across N partitions
let config = PartitionConfig::round_robin(4);
// Hash: same command type always goes to the same partition
let config = PartitionConfig::hash(4);
// None: single partition (default)
let config = PartitionConfig::none();Strategy Comparison
Prop
Type
Distributed Queue
For multi-machine deployments, implement the DistributedQueue trait:
use a3s_lane::{DistributedQueue, CommandEnvelope, CommandResult};
#[async_trait]
pub trait DistributedQueue: Send + Sync {
async fn enqueue(&self, envelope: CommandEnvelope) -> Result<()>;
async fn dequeue(&self, partition_id: PartitionId) -> Result<Option<CommandEnvelope>>;
async fn complete(&self, result: CommandResult) -> Result<()>;
fn num_partitions(&self) -> usize;
fn worker_id(&self) -> WorkerId;
}Local Implementation
For single-machine multi-core parallelism:
use a3s_lane::LocalDistributedQueue;
// Auto-detect CPU cores
let queue = LocalDistributedQueue::auto();
// Explicit partition count
let queue = LocalDistributedQueue::new(PartitionConfig::round_robin(8));Custom Implementation
Implement DistributedQueue for Redis, Kafka, or other message brokers:
struct RedisDistributedQueue {
client: redis::Client,
worker_id: WorkerId,
partitions: usize,
}
#[async_trait]
impl DistributedQueue for RedisDistributedQueue {
async fn enqueue(&self, envelope: CommandEnvelope) -> Result<()> {
let partition = envelope.partition_id % self.partitions;
let key = format!("lane:partition:{}", partition);
// LPUSH to Redis list
Ok(())
}
async fn dequeue(&self, partition_id: PartitionId) -> Result<Option<CommandEnvelope>> {
let key = format!("lane:partition:{}", partition_id);
// BRPOP from Redis list
Ok(None)
}
async fn complete(&self, result: CommandResult) -> Result<()> {
// Store result, notify waiters
Ok(())
}
fn num_partitions(&self) -> usize { self.partitions }
fn worker_id(&self) -> WorkerId { self.worker_id }
}Scaling Recommendations
Single Machine
// Small workload: default lanes, no partitioning
let manager = QueueManagerBuilder::new(emitter)
.with_default_lanes()
.build()
.await?;
// High throughput: rate limiting + metrics
let manager = QueueManagerBuilder::new(emitter)
.with_lane("api", LaneConfig::new(2, 20)
.with_rate_limit(RateLimitConfig::per_second(1000)), 0)
.with_metrics(QueueMetrics::local())
.build()
.await?;Multi-Core
// Use auto-partitioning for CPU-bound workloads
let queue = LocalDistributedQueue::auto();Production Checklist
Prop
Type