A3S Lane
Commands
Define, submit, and track command execution through the queue
Commands
Commands are the unit of work in A3S Lane. In Rust, any struct implementing the Command trait can be submitted. In Python and Node.js, you submit a (lane_id, command_type, payload) tuple directly — no trait implementation required.
Command Trait (Rust)
use a3s_lane::Result;
use async_trait::async_trait;
use serde_json::Value;
#[async_trait]
pub trait Command: Send + Sync {
async fn execute(&self) -> Result<Value>;
fn command_type(&self) -> &str;
}Commands must be Send + Sync since they execute on Tokio tasks. The return type is always serde_json::Value.
Defining Commands
struct AnalyzeText {
text: String,
model: String,
}
#[async_trait]
impl Command for AnalyzeText {
async fn execute(&self) -> Result<Value> {
let analysis = run_analysis(&self.text, &self.model).await?;
Ok(serde_json::json!({
"sentiment": analysis.sentiment,
"topics": analysis.topics,
"confidence": analysis.confidence,
}))
}
fn command_type(&self) -> &str { "analyze_text" }
}command_type() labels the command in event payloads, dead letter queue entries, storage records, and metrics.
Submitting Commands
let rx = manager.submit("query", Box::new(AnalyzeText {
text: "A3S Lane is fast".into(),
model: "sentiment-v2".into(),
})).await?;
// Block until the command completes
let result = rx.await??;
println!("{}", result);submit() returns a oneshot::Receiver<Result<Value>>. The ?? unwraps the channel (RecvError) and the command result (LaneError).
Fire and forget (drop the receiver):
let _ = manager.submit("background", Box::new(MyCmd {})).await?;Error handling:
match rx.await {
Ok(Ok(value)) => println!("Success: {}", value),
Ok(Err(e)) => match e {
LaneError::Timeout(d) => println!("Timed out after {:?}", d),
LaneError::CommandError(msg) => println!("Command failed: {}", msg),
LaneError::LaneNotFound(id) => println!("No such lane: {}", id),
LaneError::ShutdownInProgress => println!("Queue is shutting down"),
_ => println!("Error: {}", e),
},
Err(_) => println!("Channel dropped"),
}from a3s_lane import Lane
lane = Lane()
lane.start()
# submit(lane_id, command_type, payload) — blocks until the queue slot is acquired.
# The queue enforces priority ordering and concurrency limits.
# Returns the payload dict as confirmation.
result = lane.submit("query", "analyze_text", {
"text": "A3S Lane is fast",
"model": "sentiment-v2",
})
print(result)const { Lane } = require('@a3s-lab/lane');
const lane = new Lane();
lane.start();
// submit(laneId, commandType, jsonPayload) — blocks until the queue slot is acquired.
// Returns a JSON string result.
const result = JSON.parse(lane.submit(
'query',
'analyze_text',
JSON.stringify({ text: 'A3S Lane is fast', model: 'sentiment-v2' })
));
console.log(result);Execution Lifecycle
submit() → Pending Queue → Scheduler → Execute → Result
│ │
│ on failure
│ │
│ Retry? ──yes──→ Re-enqueue with delay
│ │
│ no
│ │
│ ↓
│ Dead Letter Queue
│
on timeout
│
↓
LaneError::Timeout- Submission — Command enqueued. Persisted to storage if configured.
- Scheduling — Background loop (10ms) picks the highest-priority lane with available capacity.
- Execution —
execute()runs in a Tokio task, wrapped with timeout if configured. - Completion — Result sent to the oneshot receiver. Command removed from storage.
- Retry — On failure with retries remaining, re-enqueued after a delay.
- Dead Letter — If all retries exhausted, moved to the DLQ.
Queue Stats
let stats = manager.stats().await?;
println!("Total pending: {}", stats.total_pending);
println!("Total active: {}", stats.total_active);
println!("Dead letters: {}", stats.dead_letter_count);Prop
Type