A3S Docs
A3S Lane

Reliability

Timeout, retry, dead letter queue, persistent storage, and graceful shutdown

Reliability

A3S Lane provides multiple reliability layers: per-command timeouts, configurable retry policies, a dead letter queue for permanently failed commands, persistent storage for crash recovery, and graceful shutdown.

Timeout, retry, DLQ, and persistent storage are Rust-only features. Graceful shutdown (drain) is available in all SDKs.

Timeout

Set a per-lane timeout so no command runs indefinitely:

let config = LaneConfig::new(1, 10)
    .with_timeout(Duration::from_secs(30));

When a command exceeds the timeout, it receives LaneError::Timeout(duration). If retries are configured, the command is retried. Otherwise it goes to the dead letter queue (if enabled) or the error is returned to the caller.

Retry Policies

Exponential Backoff

let config = LaneConfig::new(1, 10)
    .with_retry_policy(RetryPolicy::exponential(3));

Prop

Type

Fixed Delay

let config = LaneConfig::new(1, 10)
    .with_retry_policy(RetryPolicy::fixed(5, Duration::from_secs(1)));

No Retries (Default)

let config = LaneConfig::new(1, 10)
    .with_retry_policy(RetryPolicy::none());

Dead Letter Queue

Commands that exhaust all retries move to the dead letter queue (DLQ).

Enable DLQ

let manager = QueueManagerBuilder::new(emitter)
    .with_default_lanes()
    .with_dlq(100)  // max 100 dead letters
    .build()
    .await?;

Inspect Dead Letters

if let Some(dlq) = manager.queue().dlq() {
    for letter in dlq.list().await {
        println!("[{}] {} in '{}': {} (attempts: {})",
            letter.failed_at,
            letter.command_type,
            letter.lane_id,
            letter.error,
            letter.attempts,
        );
    }
}

DeadLetter Fields

Prop

Type

DLQ Operations

let letter = dlq.pop().await;    // pop one (for retry or inspection)
dlq.clear().await;               // clear all
let count = dlq.len().await;
let empty = dlq.is_empty().await;

Persistent Storage

Enable storage so pending commands survive process restarts.

Local Filesystem Storage

use std::path::PathBuf;

let storage = Arc::new(
    LocalStorage::new(PathBuf::from("./queue-data")).await?
);

let manager = QueueManagerBuilder::new(emitter)
    .with_default_lanes()
    .with_storage(storage)
    .with_dlq(100)
    .build()
    .await?;

LocalStorage persists commands as JSON files with an in-memory cache for fast reads.

Custom Storage Backend

Implement the Storage trait for any backend (Redis, PostgreSQL, etc.):

use a3s_lane::Storage;

#[async_trait]
impl Storage for MyRedisStorage {
    async fn save_command(&self, cmd: StoredCommand) -> Result<()> { /* ... */ }
    async fn load_commands(&self) -> Result<Vec<StoredCommand>> { /* ... */ }
    async fn remove_command(&self, id: &str) -> Result<()> { /* ... */ }
    async fn save_dead_letter(&self, letter: StoredDeadLetter) -> Result<()> { /* ... */ }
    async fn load_dead_letters(&self) -> Result<Vec<StoredDeadLetter>> { /* ... */ }
    async fn clear_dead_letters(&self) -> Result<()> { /* ... */ }
    async fn clear_all(&self) -> Result<()> { /* ... */ }
}

Prop

Type

Graceful Shutdown

Stop accepting new commands and wait for in-flight work to complete:

// 1. Stop accepting new commands
manager.shutdown().await;

// 2. Wait for in-flight commands to complete (with timeout)
match manager.drain(Duration::from_secs(30)).await {
    Ok(())  => println!("All commands completed"),
    Err(e)  => println!("Drain timed out: {}", e),
}

if manager.is_shutting_down() {
    println!("Queue is shutting down");
}

shutdown() sets an internal flag — new submit() calls return LaneError::ShutdownInProgress. drain(timeout) polls every 100ms until all lanes report pending == 0 and active == 0.

from a3s_lane import Lane

lane = Lane()
lane.start()

# ... submit work ...

lane.shutdown()                # stop accepting new submits
lane.drain(timeout_secs=30.0) # wait for in-flight to complete
const { Lane } = require('@a3s-lab/lane');

const lane = new Lane();
lane.start();

// ... submit work ...

lane.shutdown();       // stop accepting new submits
lane.drain(30_000);    // wait for in-flight to complete (timeout in ms)

On this page