A3S Docs
A3S Lane

Events

Real-time event stream for monitoring command lifecycle and queue state

Events

A3S Lane emits events at every stage of the command lifecycle. EventStream implements futures_core::Stream, so it works with any stream combinator. Subscribe directly from the manager — no need to thread EventEmitter manually.

Subscribe from the Manager

use tokio_stream::StreamExt;

// All events
let mut stream = manager.subscribe();

// Filtered — predicate runs on the subscriber side
let mut failures = manager.subscribe_filtered(|e| {
    e.key == "queue.command.failed" || e.key == "queue.command.timeout"
});

tokio::spawn(async move {
    while let Some(event) = stream.next().await {
        println!("[{}] {}", event.timestamp, event.key);
    }
});

Both subscribe() and subscribe_filtered() return EventStream, which implements futures_core::Stream<Item = LaneEvent>.

EventStream as Stream

Use .next().await directly, or any stream combinator from StreamExt:

use tokio_stream::StreamExt;

// Convenience recv() method — same as .next().await
while let Some(event) = stream.recv().await {
    println!("{}", event.key);
}

// Timeout per event
while let Ok(Some(event)) = tokio::time::timeout(
    Duration::from_secs(5),
    stream.next(),
).await {
    println!("{}", event.key);
}

// Take first N
let first_ten: Vec<_> = stream.take(10).collect().await;

Subscribe from the Emitter

If you need to subscribe before the manager is built:

// EventStream (implements Stream) — all events
let mut stream = emitter.subscribe_stream();

// EventStream — filtered
let mut stream = emitter.subscribe_filtered(|e| e.key.starts_with("queue.command"));

// Raw broadcast::Receiver — legacy API
let mut rx = emitter.subscribe();
while let Ok(event) = rx.recv().await { /* ... */ }

Python and Node SDKs

from a3s_lane import Lane

lane = Lane()
lane.start()

# All events — returns EventStream
stream = lane.subscribe()

# recv() blocks until an event arrives; returns None on timeout
event = stream.recv(timeout_ms=5000)
if event:
    print(f"[{event.timestamp}] {event.key}")
    print(event.payload)  # dict

# Filtered — exact key match
failures = lane.subscribe_filtered([
    "queue.command.failed",
    "queue.command.timeout",
])

# Event loop
while True:
    event = failures.recv(timeout_ms=1000)
    if event is None:
        break
    print(event.key, event.payload)
const { Lane } = require('@a3s-lab/lane');

const lane = new Lane();
lane.start();

// All events — callback receives (err, event) for every event
lane.subscribe((err, event) => {
  if (err) throw err;
  console.log(`[${event.timestamp}] ${event.key}`);
  console.log(JSON.parse(event.payload));  // event.payload is a JSON string
});

// Filtered — exact key match
lane.subscribeFiltered(
  ['queue.command.failed', 'queue.command.timeout'],
  (err, event) => {
    if (err) throw err;
    console.error('failure:', event.key, JSON.parse(event.payload));
  }
);

Event Reference

Command Lifecycle

Prop

Type

Lane Pressure

Prop

Type

These events require with_pressure_threshold(n) on the lane config. See Lane pressure.

Shutdown

Prop

Type

LaneEvent Structure

pub struct LaneEvent {
    pub key: EventKey,            // dot-separated identifier
    pub payload: EventPayload,    // Empty | String(String) | Map(HashMap<String, Value>)
    pub timestamp: DateTime<Utc>,
}

Example: Lifecycle Logger

use tokio_stream::StreamExt;

let mut stream = manager.subscribe();

tokio::spawn(async move {
    while let Some(event) = stream.next().await {
        if let a3s_lane::EventPayload::Map(ref m) = event.payload {
            let lane = m.get("lane_id").and_then(|v| v.as_str()).unwrap_or("?");
            match event.key.as_str() {
                "queue.command.submitted"  => println!("+ queued  → {lane}"),
                "queue.command.started"    => println!("→ started → {lane}"),
                "queue.command.completed"  => println!("✓ done    ← {lane}"),
                "queue.command.failed"     => {
                    let err = m.get("error").and_then(|v| v.as_str()).unwrap_or("?");
                    println!("✗ failed  ← {lane}: {err}");
                }
                _ => {}
            }
        }
    }
});

On this page