A3S Event
Observability
Lock-free metrics, tracing spans, and health checks
Observability
A3S Event provides three layers of observability: lock-free atomic metrics for dashboards, tracing spans for structured logging, and a health check API for readiness probes.
Metrics
EventMetrics uses lock-free atomic counters for zero-contention metric collection.
Reading Metrics
let metrics = bus.metrics();
let snapshot = metrics.snapshot();
println!("Published: {}", snapshot.publish_count);
println!("Publish errors: {}", snapshot.publish_errors);
println!("Subscribed: {}", snapshot.subscribe_count);
println!("Unsubscribed: {}", snapshot.unsubscribe_count);
println!("Dead lettered: {}", snapshot.dlq_count);
println!("Validation errors: {}", snapshot.validation_errors);
println!("Encrypted: {}", snapshot.encrypt_count);
println!("Decrypted: {}", snapshot.decrypt_count);
println!("Avg latency: {}µs", snapshot.avg_publish_latency_us);
println!("Max latency: {}µs", snapshot.max_publish_latency_us);MetricsSnapshot Fields
Prop
Type
JSON Serialization
MetricsSnapshot serializes with camelCase field names for dashboard consumption:
let snapshot = bus.metrics().snapshot();
let json = serde_json::to_string_pretty(&snapshot)?;{
"publishCount": 1042,
"publishErrors": 3,
"subscribeCount": 5,
"unsubscribeCount": 1,
"dlqCount": 2,
"validationErrors": 1,
"encryptCount": 1042,
"decryptCount": 500,
"avgPublishLatencyUs": 146,
"maxPublishLatencyUs": 8700
}Reset Metrics
bus.metrics().reset();Zeroes all counters. Useful for periodic scraping where you want delta values.
Implementation Details
- All counters use
AtomicU64withOrdering::Relaxedfor minimal overhead - Max latency uses a lock-free CAS loop (compare-and-swap)
- Publish latency is measured end-to-end (validation + encryption + provider publish)
- Average latency is computed from cumulative latency / publish count
Tracing Spans
A3S Event emits structured tracing spans on key operations:
use tracing_subscriber;
// Enable tracing output
tracing_subscriber::init();Span Events
Prop
Type
Example output with tracing-subscriber:
INFO event.publish{event_id="evt-abc" subject="events.orders.created" category="orders" provider="memory"}: a3s_event: publishing event
INFO Subscription updated subscriber="order-processor" subjects=["events.orders.>"] durable=trueHealth Check
let is_healthy = bus.health().await?;Prop
Type
Use in Kubernetes readiness probes or load balancer health checks:
async fn health_handler(bus: &EventBus) -> StatusCode {
match bus.health().await {
Ok(true) => StatusCode::OK,
_ => StatusCode::SERVICE_UNAVAILABLE,
}
}Combining All Three
// 1. Tracing for structured logs
tracing_subscriber::init();
// 2. EventBus with metrics
let bus = EventBus::new(provider);
// 3. Expose metrics endpoint
async fn metrics_handler(bus: &EventBus) -> String {
serde_json::to_string(&bus.metrics().snapshot()).unwrap()
}
// 4. Expose health endpoint
async fn health_handler(bus: &EventBus) -> bool {
bus.health().await.unwrap_or(false)
}