A3S Lane
Quick Start
Get up and running with A3S Lane in any supported language
Quick Start
Installation
[dependencies]
a3s-lane = "0.4"
tokio = { version = "1", features = ["full"] }
serde_json = "1"All four features (distributed, metrics, monitoring, telemetry) are on by default. Core queue only:
a3s-lane = { version = "0.4", default-features = false }pip install a3s-lanePython 3.9–3.13. Pre-built wheels for Linux (glibc/musl), macOS, and Windows.
npm install @a3s-lab/laneNode.js 16+. Native binary for Linux, macOS, and Windows (x64/arm64).
Basic Usage
1. Define a Command
Implement the Command trait for your task:
use a3s_lane::{Command, Result};
use async_trait::async_trait;
use serde_json::Value;
struct FetchData { url: String }
#[async_trait]
impl Command for FetchData {
async fn execute(&self) -> Result<Value> {
// perform the actual work
Ok(serde_json::json!({ "url": self.url }))
}
fn command_type(&self) -> &str { "fetch_data" }
}2. Build and Start
use a3s_lane::{EventEmitter, QueueManagerBuilder};
let emitter = EventEmitter::new(100);
let manager = QueueManagerBuilder::new(emitter)
.with_default_lanes()
.build().await?;
manager.start().await?;3. Submit and Await
let rx = manager.submit("query", Box::new(FetchData {
url: "https://api.example.com/data".into(),
})).await?;
let result = rx.await??;
println!("{result}");submit() returns a oneshot::Receiver<Result<Value>>. The ?? unwraps the channel and the command result.
4. Shutdown
use std::time::Duration;
manager.shutdown().await;
manager.drain(Duration::from_secs(30)).await?;from a3s_lane import Lane
lane = Lane() # default lanes: system, control, query, session, skill, prompt
lane.start()
# Submit schedules work in priority order and blocks until it runs.
# Returns the payload as confirmation — the queue provides scheduling,
# concurrency control, and rate limiting.
result = lane.submit("query", "fetch_data", {"url": "https://api.example.com/data"})
# Subscribe to lifecycle events
stream = lane.subscribe()
event = stream.recv(timeout_ms=1000)
if event:
print(f"[{event.timestamp}] {event.key}")
# Shutdown
lane.shutdown()
lane.drain(timeout_secs=30.0)const { Lane } = require('@a3s-lab/lane');
const lane = new Lane(); // default lanes: system, control, query, session, skill, prompt
lane.start();
// Submit schedules work in priority order. Returns JSON string result.
const result = JSON.parse(
lane.submit('query', 'fetch_data', JSON.stringify({ url: 'https://api.example.com/data' }))
);
// Subscribe to lifecycle events
lane.subscribe((err, event) => {
if (err) throw err;
console.log(`[${event.timestamp}] ${event.key}`);
});
// Shutdown
lane.shutdown();
lane.drain(30_000);Custom Lanes
use a3s_lane::{QueueManagerBuilder, LaneConfig};
let manager = QueueManagerBuilder::new(emitter)
.with_lane("high", LaneConfig::new(1, 4), 0)
.with_lane("low", LaneConfig::new(1, 2), 1)
.build().await?;from a3s_lane import Lane, LaneConfig
lane = Lane.with_lanes([
LaneConfig("high", priority=0, min_concurrency=1, max_concurrency=4),
LaneConfig("low", priority=1, min_concurrency=1, max_concurrency=2),
])
lane.start()const { Lane } = require('@a3s-lab/lane');
const lane = Lane.withLanes([
{ laneId: 'high', priority: 0, minConcurrency: 1, maxConcurrency: 4 },
{ laneId: 'low', priority: 1, minConcurrency: 1, maxConcurrency: 2 },
]);
lane.start();With Reliability (Rust)
Add timeout, retry, dead letter queue, and persistent storage:
use a3s_lane::*;
use std::{sync::Arc, time::Duration, path::PathBuf};
let storage = Arc::new(LocalStorage::new(PathBuf::from("./queue")).await?);
let manager = QueueManagerBuilder::new(emitter)
.with_lane(
"api",
LaneConfig::new(1, 10)
.with_timeout(Duration::from_secs(30))
.with_retry_policy(RetryPolicy::exponential(3)),
0,
)
.with_dlq(100)
.with_storage(storage)
.build().await?;See Reliability for details.
With Observability (Rust)
Add metrics and alerts:
use a3s_lane::*;
use std::sync::Arc;
let manager = QueueManagerBuilder::new(emitter)
.with_default_lanes()
.with_metrics(QueueMetrics::local())
.with_alerts(Arc::new(AlertManager::with_queue_depth_alerts(100, 500)))
.build().await?;
let snapshot = manager.metrics().unwrap().snapshot().await;
println!("Submitted: {:?}", snapshot.counters.get("lane.commands.submitted"));See Observability for details.