A3S Lane
Events
Real-time event stream for monitoring command lifecycle and queue state
Events
A3S Lane emits events at every stage of the command lifecycle. EventStream implements futures_core::Stream, so it works with any stream combinator. Subscribe directly from the manager — no need to thread EventEmitter manually.
Subscribe from the Manager
use tokio_stream::StreamExt;
// All events
let mut stream = manager.subscribe();
// Filtered — predicate runs on the subscriber side
let mut failures = manager.subscribe_filtered(|e| {
e.key == "queue.command.failed" || e.key == "queue.command.timeout"
});
tokio::spawn(async move {
while let Some(event) = stream.next().await {
println!("[{}] {}", event.timestamp, event.key);
}
});Both subscribe() and subscribe_filtered() return EventStream, which implements futures_core::Stream<Item = LaneEvent>.
EventStream as Stream
Use .next().await directly, or any stream combinator from StreamExt:
use tokio_stream::StreamExt;
// Convenience recv() method — same as .next().await
while let Some(event) = stream.recv().await {
println!("{}", event.key);
}
// Timeout per event
while let Ok(Some(event)) = tokio::time::timeout(
Duration::from_secs(5),
stream.next(),
).await {
println!("{}", event.key);
}
// Take first N
let first_ten: Vec<_> = stream.take(10).collect().await;Subscribe from the Emitter
If you need to subscribe before the manager is built:
// EventStream (implements Stream) — all events
let mut stream = emitter.subscribe_stream();
// EventStream — filtered
let mut stream = emitter.subscribe_filtered(|e| e.key.starts_with("queue.command"));
// Raw broadcast::Receiver — legacy API
let mut rx = emitter.subscribe();
while let Ok(event) = rx.recv().await { /* ... */ }Python and Node SDKs
from a3s_lane import Lane
lane = Lane()
lane.start()
# All events — returns EventStream
stream = lane.subscribe()
# recv() blocks until an event arrives; returns None on timeout
event = stream.recv(timeout_ms=5000)
if event:
print(f"[{event.timestamp}] {event.key}")
print(event.payload) # dict
# Filtered — exact key match
failures = lane.subscribe_filtered([
"queue.command.failed",
"queue.command.timeout",
])
# Event loop
while True:
event = failures.recv(timeout_ms=1000)
if event is None:
break
print(event.key, event.payload)const { Lane } = require('@a3s-lab/lane');
const lane = new Lane();
lane.start();
// All events — callback receives (err, event) for every event
lane.subscribe((err, event) => {
if (err) throw err;
console.log(`[${event.timestamp}] ${event.key}`);
console.log(JSON.parse(event.payload)); // event.payload is a JSON string
});
// Filtered — exact key match
lane.subscribeFiltered(
['queue.command.failed', 'queue.command.timeout'],
(err, event) => {
if (err) throw err;
console.error('failure:', event.key, JSON.parse(event.payload));
}
);Event Reference
Command Lifecycle
Prop
Type
Lane Pressure
Prop
Type
These events require with_pressure_threshold(n) on the lane config. See Lane pressure.
Shutdown
Prop
Type
LaneEvent Structure
pub struct LaneEvent {
pub key: EventKey, // dot-separated identifier
pub payload: EventPayload, // Empty | String(String) | Map(HashMap<String, Value>)
pub timestamp: DateTime<Utc>,
}Example: Lifecycle Logger
use tokio_stream::StreamExt;
let mut stream = manager.subscribe();
tokio::spawn(async move {
while let Some(event) = stream.next().await {
if let a3s_lane::EventPayload::Map(ref m) = event.payload {
let lane = m.get("lane_id").and_then(|v| v.as_str()).unwrap_or("?");
match event.key.as_str() {
"queue.command.submitted" => println!("+ queued → {lane}"),
"queue.command.started" => println!("→ started → {lane}"),
"queue.command.completed" => println!("✓ done ← {lane}"),
"queue.command.failed" => {
let err = m.get("error").and_then(|v| v.as_str()).unwrap_or("?");
println!("✗ failed ← {lane}: {err}");
}
_ => {}
}
}
}
});