A3S Docs
A3S Lane

Scalability

Rate limiting, priority boosting, queue partitioning, and distributed queues

Scalability

A3S Lane includes features for scaling from a single-core prototype to multi-core and multi-machine deployments: rate limiting, priority boosting, queue partitioning, and a distributed queue interface.

Rate Limiting

Control command throughput per lane using token bucket or sliding window algorithms.

Token Bucket

// 100 commands per second
let config = LaneConfig::new(1, 10)
    .with_rate_limit(RateLimitConfig::per_second(100));

// 1000 commands per minute
let config = LaneConfig::new(1, 10)
    .with_rate_limit(RateLimitConfig::per_minute(1000));

// 10000 commands per hour
let config = LaneConfig::new(1, 10)
    .with_rate_limit(RateLimitConfig::per_hour(10000));

Token bucket provides smooth rate limiting - tokens are replenished at a constant rate and each command consumes one token. If no tokens are available, the command waits.

Priority Boosting

Prevent starvation of low-priority commands by automatically escalating their priority as a deadline approaches.

Standard Boosting

Boosts at 25%, 50%, 75% of the deadline:

let config = LaneConfig::new(1, 10)
    .with_priority_boost(PriorityBoostConfig::standard(Duration::from_secs(60)));

For a 60-second deadline:

  • At 15s: first boost
  • At 30s: second boost
  • At 45s: third boost (near highest priority)

Aggressive Boosting

More frequent escalation for time-sensitive workloads:

let config = LaneConfig::new(1, 10)
    .with_priority_boost(PriorityBoostConfig::aggressive(Duration::from_secs(30)));

Custom Boost Intervals

Define exact boost points:

let config = LaneConfig::new(1, 10)
    .with_priority_boost(PriorityBoostConfig::custom(
        Duration::from_secs(120),
        vec![
            Duration::from_secs(30),  // First boost at 30s
            Duration::from_secs(60),  // Second boost at 60s
            Duration::from_secs(90),  // Third boost at 90s
        ],
    ));

Queue Partitioning

Distribute commands across multiple partitions for multi-core parallelism.

Partition Strategies

use a3s_lane::PartitionConfig;

// Automatic: one partition per CPU core
let config = PartitionConfig::auto();

// Round-robin: distribute commands evenly across N partitions
let config = PartitionConfig::round_robin(4);

// Hash: same command type always goes to the same partition
let config = PartitionConfig::hash(4);

// None: single partition (default)
let config = PartitionConfig::none();

Strategy Comparison

Prop

Type

Distributed Queue

For multi-machine deployments, implement the DistributedQueue trait:

use a3s_lane::{DistributedQueue, CommandEnvelope, CommandResult};

#[async_trait]
pub trait DistributedQueue: Send + Sync {
    async fn enqueue(&self, envelope: CommandEnvelope) -> Result<()>;
    async fn dequeue(&self, partition_id: PartitionId) -> Result<Option<CommandEnvelope>>;
    async fn complete(&self, result: CommandResult) -> Result<()>;
    fn num_partitions(&self) -> usize;
    fn worker_id(&self) -> WorkerId;
}

Local Implementation

For single-machine multi-core parallelism:

use a3s_lane::LocalDistributedQueue;

// Auto-detect CPU cores
let queue = LocalDistributedQueue::auto();

// Explicit partition count
let queue = LocalDistributedQueue::new(PartitionConfig::round_robin(8));

Custom Implementation

Implement DistributedQueue for Redis, Kafka, or other message brokers:

struct RedisDistributedQueue {
    client: redis::Client,
    worker_id: WorkerId,
    partitions: usize,
}

#[async_trait]
impl DistributedQueue for RedisDistributedQueue {
    async fn enqueue(&self, envelope: CommandEnvelope) -> Result<()> {
        let partition = envelope.partition_id % self.partitions;
        let key = format!("lane:partition:{}", partition);
        // LPUSH to Redis list
        Ok(())
    }

    async fn dequeue(&self, partition_id: PartitionId) -> Result<Option<CommandEnvelope>> {
        let key = format!("lane:partition:{}", partition_id);
        // BRPOP from Redis list
        Ok(None)
    }

    async fn complete(&self, result: CommandResult) -> Result<()> {
        // Store result, notify waiters
        Ok(())
    }

    fn num_partitions(&self) -> usize { self.partitions }
    fn worker_id(&self) -> WorkerId { self.worker_id }
}

Scaling Recommendations

Single Machine

// Small workload: default lanes, no partitioning
let manager = QueueManagerBuilder::new(emitter)
    .with_default_lanes()
    .build()
    .await?;

// High throughput: rate limiting + metrics
let manager = QueueManagerBuilder::new(emitter)
    .with_lane("api", LaneConfig::new(2, 20)
        .with_rate_limit(RateLimitConfig::per_second(1000)), 0)
    .with_metrics(QueueMetrics::local())
    .build()
    .await?;

Multi-Core

// Use auto-partitioning for CPU-bound workloads
let queue = LocalDistributedQueue::auto();

Production Checklist

Prop

Type

On this page