A3S Event
Quick Start
Get started with A3S Event — publish, subscribe, and query events
Quick Start
Installation
[dependencies]
a3s-event = "0.3"
tokio = { version = "1", features = ["full"] }
serde_json = "1"In-Memory (Development)
use a3s_event::{EventBus, Event, SubscriptionFilter};
use a3s_event::provider::memory::MemoryProvider;
#[tokio::main]
async fn main() -> a3s_event::Result<()> {
let bus = EventBus::new(MemoryProvider::default());
// Publish an event
let event = bus.publish(
"orders", // category
"created", // topic
"New order ORD-001", // summary
"order-service", // source
serde_json::json!({ "order_id": "ORD-001", "total": 99.99 }),
).await?;
println!("Published: {}", event.id);
// Query history
let events = bus.list_events(Some("orders"), 100).await?;
println!("Order events: {}", events.len());
Ok(())
}NATS JetStream (Production)
Requires the nats feature (enabled by default).
use a3s_event::{EventBus, NatsProvider, NatsConfig, SubscriptionFilter};
let provider = NatsProvider::connect(NatsConfig {
url: "nats://localhost:4222".to_string(),
stream_name: "A3S_EVENTS".to_string(),
..Default::default()
}).await?;
let bus = EventBus::new(provider);
// Register a durable subscription
bus.update_subscription(SubscriptionFilter {
subscriber_id: "order-processor".to_string(),
subjects: vec!["events.orders.>".to_string()],
durable: true,
options: None,
}).await?;
// Create subscriber handles
let mut subs = bus.create_subscriber("order-processor").await?;
// Publish
bus.publish(
"orders", "created",
"New order ORD-001", "order-service",
serde_json::json!({ "order_id": "ORD-001" }),
).await?;
// Receive events
for sub in &mut subs {
if let Some(received) = sub.next().await? {
println!("{}: {}", received.event.subject, received.event.summary);
}
}With All Features
use a3s_event::*;
use std::sync::Arc;
let mut bus = EventBus::new(MemoryProvider::default());
// Schema validation
let mut registry = MemorySchemaRegistry::new();
registry.register(EventSchema {
event_type: "order.created".into(),
version: 1,
required_fields: vec!["order_id".into(), "total".into()],
description: "Order creation event".into(),
})?;
let bus = EventBus::with_schema_registry(MemoryProvider::default(), Arc::new(registry));
// Encryption (requires `encryption` feature)
let mut bus = bus; // need mut for setters
let encryptor = Aes256GcmEncryptor::new("key-v1", b"0123456789abcdef0123456789abcdef");
bus.set_encryptor(Arc::new(encryptor));
// Dead letter queue
let dlq = MemoryDlqHandler::new(100);
bus.set_dlq_handler(Arc::new(dlq));
// State persistence
let store = FileStateStore::new("./state/event-bus.json")?;
bus.set_state_store(Arc::new(store))?;
// Publish a typed event (validated against schema)
let event = Event::typed(
"events.orders.created",
"orders",
"order.created",
1,
"New order ORD-001",
"order-service",
serde_json::json!({ "order_id": "ORD-001", "total": 99.99 }),
);
let seq = bus.publish_event(&event).await?;
// Check metrics
let snapshot = bus.metrics().snapshot();
println!("Published: {}, Errors: {}", snapshot.publish_count, snapshot.publish_errors);
// Health check
let healthy = bus.health().await?;