Reliability
Timeout, retry, dead letter queue, persistent storage, and graceful shutdown
Reliability
A3S Lane provides multiple reliability layers: per-command timeouts, configurable retry policies, a dead letter queue for permanently failed commands, persistent storage for crash recovery, and graceful shutdown.
Timeout, retry, DLQ, and persistent storage are Rust-only features. Graceful shutdown (drain) is available in all SDKs.
Timeout
Set a per-lane timeout so no command runs indefinitely:
let config = LaneConfig::new(1, 10)
.with_timeout(Duration::from_secs(30));When a command exceeds the timeout, it receives LaneError::Timeout(duration). If retries are configured, the command is retried. Otherwise it goes to the dead letter queue (if enabled) or the error is returned to the caller.
Retry Policies
Exponential Backoff
let config = LaneConfig::new(1, 10)
.with_retry_policy(RetryPolicy::exponential(3));Prop
Type
Fixed Delay
let config = LaneConfig::new(1, 10)
.with_retry_policy(RetryPolicy::fixed(5, Duration::from_secs(1)));No Retries (Default)
let config = LaneConfig::new(1, 10)
.with_retry_policy(RetryPolicy::none());Dead Letter Queue
Commands that exhaust all retries move to the dead letter queue (DLQ).
Enable DLQ
let manager = QueueManagerBuilder::new(emitter)
.with_default_lanes()
.with_dlq(100) // max 100 dead letters
.build()
.await?;Inspect Dead Letters
if let Some(dlq) = manager.queue().dlq() {
for letter in dlq.list().await {
println!("[{}] {} in '{}': {} (attempts: {})",
letter.failed_at,
letter.command_type,
letter.lane_id,
letter.error,
letter.attempts,
);
}
}DeadLetter Fields
Prop
Type
DLQ Operations
let letter = dlq.pop().await; // pop one (for retry or inspection)
dlq.clear().await; // clear all
let count = dlq.len().await;
let empty = dlq.is_empty().await;Persistent Storage
Enable storage so pending commands survive process restarts.
Local Filesystem Storage
use std::path::PathBuf;
let storage = Arc::new(
LocalStorage::new(PathBuf::from("./queue-data")).await?
);
let manager = QueueManagerBuilder::new(emitter)
.with_default_lanes()
.with_storage(storage)
.with_dlq(100)
.build()
.await?;LocalStorage persists commands as JSON files with an in-memory cache for fast reads.
Custom Storage Backend
Implement the Storage trait for any backend (Redis, PostgreSQL, etc.):
use a3s_lane::Storage;
#[async_trait]
impl Storage for MyRedisStorage {
async fn save_command(&self, cmd: StoredCommand) -> Result<()> { /* ... */ }
async fn load_commands(&self) -> Result<Vec<StoredCommand>> { /* ... */ }
async fn remove_command(&self, id: &str) -> Result<()> { /* ... */ }
async fn save_dead_letter(&self, letter: StoredDeadLetter) -> Result<()> { /* ... */ }
async fn load_dead_letters(&self) -> Result<Vec<StoredDeadLetter>> { /* ... */ }
async fn clear_dead_letters(&self) -> Result<()> { /* ... */ }
async fn clear_all(&self) -> Result<()> { /* ... */ }
}Prop
Type
Graceful Shutdown
Stop accepting new commands and wait for in-flight work to complete:
// 1. Stop accepting new commands
manager.shutdown().await;
// 2. Wait for in-flight commands to complete (with timeout)
match manager.drain(Duration::from_secs(30)).await {
Ok(()) => println!("All commands completed"),
Err(e) => println!("Drain timed out: {}", e),
}
if manager.is_shutting_down() {
println!("Queue is shutting down");
}shutdown() sets an internal flag — new submit() calls return LaneError::ShutdownInProgress. drain(timeout) polls every 100ms until all lanes report pending == 0 and active == 0.
from a3s_lane import Lane
lane = Lane()
lane.start()
# ... submit work ...
lane.shutdown() # stop accepting new submits
lane.drain(timeout_secs=30.0) # wait for in-flight to completeconst { Lane } = require('@a3s-lab/lane');
const lane = new Lane();
lane.start();
// ... submit work ...
lane.shutdown(); // stop accepting new submits
lane.drain(30_000); // wait for in-flight to complete (timeout in ms)