A3S Docs
A3S Lane

Quick Start

Get up and running with A3S Lane in any supported language

Quick Start

Installation

[dependencies]
a3s-lane = "0.4"
tokio = { version = "1", features = ["full"] }
serde_json = "1"

All four features (distributed, metrics, monitoring, telemetry) are on by default. Core queue only:

a3s-lane = { version = "0.4", default-features = false }
pip install a3s-lane

Python 3.9–3.13. Pre-built wheels for Linux (glibc/musl), macOS, and Windows.

npm install @a3s-lab/lane

Node.js 16+. Native binary for Linux, macOS, and Windows (x64/arm64).

Basic Usage

1. Define a Command

Implement the Command trait for your task:

use a3s_lane::{Command, Result};
use async_trait::async_trait;
use serde_json::Value;

struct FetchData { url: String }

#[async_trait]
impl Command for FetchData {
    async fn execute(&self) -> Result<Value> {
        // perform the actual work
        Ok(serde_json::json!({ "url": self.url }))
    }
    fn command_type(&self) -> &str { "fetch_data" }
}

2. Build and Start

use a3s_lane::{EventEmitter, QueueManagerBuilder};

let emitter = EventEmitter::new(100);
let manager = QueueManagerBuilder::new(emitter)
    .with_default_lanes()
    .build().await?;

manager.start().await?;

3. Submit and Await

let rx = manager.submit("query", Box::new(FetchData {
    url: "https://api.example.com/data".into(),
})).await?;

let result = rx.await??;
println!("{result}");

submit() returns a oneshot::Receiver<Result<Value>>. The ?? unwraps the channel and the command result.

4. Shutdown

use std::time::Duration;
manager.shutdown().await;
manager.drain(Duration::from_secs(30)).await?;
from a3s_lane import Lane

lane = Lane()   # default lanes: system, control, query, session, skill, prompt
lane.start()

# Submit schedules work in priority order and blocks until it runs.
# Returns the payload as confirmation — the queue provides scheduling,
# concurrency control, and rate limiting.
result = lane.submit("query", "fetch_data", {"url": "https://api.example.com/data"})

# Subscribe to lifecycle events
stream = lane.subscribe()
event = stream.recv(timeout_ms=1000)
if event:
    print(f"[{event.timestamp}] {event.key}")

# Shutdown
lane.shutdown()
lane.drain(timeout_secs=30.0)
const { Lane } = require('@a3s-lab/lane');

const lane = new Lane();   // default lanes: system, control, query, session, skill, prompt
lane.start();

// Submit schedules work in priority order. Returns JSON string result.
const result = JSON.parse(
  lane.submit('query', 'fetch_data', JSON.stringify({ url: 'https://api.example.com/data' }))
);

// Subscribe to lifecycle events
lane.subscribe((err, event) => {
  if (err) throw err;
  console.log(`[${event.timestamp}] ${event.key}`);
});

// Shutdown
lane.shutdown();
lane.drain(30_000);

Custom Lanes

use a3s_lane::{QueueManagerBuilder, LaneConfig};

let manager = QueueManagerBuilder::new(emitter)
    .with_lane("high",  LaneConfig::new(1, 4), 0)
    .with_lane("low",   LaneConfig::new(1, 2), 1)
    .build().await?;
from a3s_lane import Lane, LaneConfig

lane = Lane.with_lanes([
    LaneConfig("high", priority=0, min_concurrency=1, max_concurrency=4),
    LaneConfig("low",  priority=1, min_concurrency=1, max_concurrency=2),
])
lane.start()
const { Lane } = require('@a3s-lab/lane');

const lane = Lane.withLanes([
  { laneId: 'high', priority: 0, minConcurrency: 1, maxConcurrency: 4 },
  { laneId: 'low',  priority: 1, minConcurrency: 1, maxConcurrency: 2 },
]);
lane.start();

With Reliability (Rust)

Add timeout, retry, dead letter queue, and persistent storage:

use a3s_lane::*;
use std::{sync::Arc, time::Duration, path::PathBuf};

let storage = Arc::new(LocalStorage::new(PathBuf::from("./queue")).await?);

let manager = QueueManagerBuilder::new(emitter)
    .with_lane(
        "api",
        LaneConfig::new(1, 10)
            .with_timeout(Duration::from_secs(30))
            .with_retry_policy(RetryPolicy::exponential(3)),
        0,
    )
    .with_dlq(100)
    .with_storage(storage)
    .build().await?;

See Reliability for details.

With Observability (Rust)

Add metrics and alerts:

use a3s_lane::*;
use std::sync::Arc;

let manager = QueueManagerBuilder::new(emitter)
    .with_default_lanes()
    .with_metrics(QueueMetrics::local())
    .with_alerts(Arc::new(AlertManager::with_queue_depth_alerts(100, 500)))
    .build().await?;

let snapshot = manager.metrics().unwrap().snapshot().await;
println!("Submitted: {:?}", snapshot.counters.get("lane.commands.submitted"));

See Observability for details.

On this page