A3S Docs
A3S Lane

Commands

Define, submit, and track command execution through the queue

Commands

Commands are the unit of work in A3S Lane. In Rust, any struct implementing the Command trait can be submitted. In Python and Node.js, you submit a (lane_id, command_type, payload) tuple directly — no trait implementation required.

Command Trait (Rust)

use a3s_lane::Result;
use async_trait::async_trait;
use serde_json::Value;

#[async_trait]
pub trait Command: Send + Sync {
    async fn execute(&self) -> Result<Value>;
    fn command_type(&self) -> &str;
}

Commands must be Send + Sync since they execute on Tokio tasks. The return type is always serde_json::Value.

Defining Commands

struct AnalyzeText {
    text: String,
    model: String,
}

#[async_trait]
impl Command for AnalyzeText {
    async fn execute(&self) -> Result<Value> {
        let analysis = run_analysis(&self.text, &self.model).await?;
        Ok(serde_json::json!({
            "sentiment": analysis.sentiment,
            "topics": analysis.topics,
            "confidence": analysis.confidence,
        }))
    }

    fn command_type(&self) -> &str { "analyze_text" }
}

command_type() labels the command in event payloads, dead letter queue entries, storage records, and metrics.

Submitting Commands

let rx = manager.submit("query", Box::new(AnalyzeText {
    text: "A3S Lane is fast".into(),
    model: "sentiment-v2".into(),
})).await?;

// Block until the command completes
let result = rx.await??;
println!("{}", result);

submit() returns a oneshot::Receiver<Result<Value>>. The ?? unwraps the channel (RecvError) and the command result (LaneError).

Fire and forget (drop the receiver):

let _ = manager.submit("background", Box::new(MyCmd {})).await?;

Error handling:

match rx.await {
    Ok(Ok(value)) => println!("Success: {}", value),
    Ok(Err(e)) => match e {
        LaneError::Timeout(d)          => println!("Timed out after {:?}", d),
        LaneError::CommandError(msg)   => println!("Command failed: {}", msg),
        LaneError::LaneNotFound(id)    => println!("No such lane: {}", id),
        LaneError::ShutdownInProgress  => println!("Queue is shutting down"),
        _ => println!("Error: {}", e),
    },
    Err(_) => println!("Channel dropped"),
}
from a3s_lane import Lane

lane = Lane()
lane.start()

# submit(lane_id, command_type, payload) — blocks until the queue slot is acquired.
# The queue enforces priority ordering and concurrency limits.
# Returns the payload dict as confirmation.
result = lane.submit("query", "analyze_text", {
    "text": "A3S Lane is fast",
    "model": "sentiment-v2",
})
print(result)
const { Lane } = require('@a3s-lab/lane');

const lane = new Lane();
lane.start();

// submit(laneId, commandType, jsonPayload) — blocks until the queue slot is acquired.
// Returns a JSON string result.
const result = JSON.parse(lane.submit(
  'query',
  'analyze_text',
  JSON.stringify({ text: 'A3S Lane is fast', model: 'sentiment-v2' })
));
console.log(result);

Execution Lifecycle

submit() → Pending Queue → Scheduler → Execute → Result
                               │             │
                               │        on failure
                               │             │
                               │   Retry? ──yes──→ Re-enqueue with delay
                               │      │
                               │     no
                               │      │
                               │      ↓
                               │   Dead Letter Queue

                          on timeout


                        LaneError::Timeout
  1. Submission — Command enqueued. Persisted to storage if configured.
  2. Scheduling — Background loop (10ms) picks the highest-priority lane with available capacity.
  3. Executionexecute() runs in a Tokio task, wrapped with timeout if configured.
  4. Completion — Result sent to the oneshot receiver. Command removed from storage.
  5. Retry — On failure with retries remaining, re-enqueued after a delay.
  6. Dead Letter — If all retries exhausted, moved to the DLQ.

Queue Stats

let stats = manager.stats().await?;
println!("Total pending: {}", stats.total_pending);
println!("Total active:  {}", stats.total_active);
println!("Dead letters:  {}", stats.dead_letter_count);

Prop

Type

On this page