EventBus
High-level API for publishing, subscribing, and managing events
EventBus
EventBus is the primary API for working with events. It wraps an EventProvider and adds schema validation, encryption, DLQ handling, state persistence, metrics, and optional broker routing.
Creating an EventBus
use a3s_event::{EventBus, MemoryProvider};
let bus = EventBus::new(MemoryProvider::default());With schema validation:
use a3s_event::{EventBus, MemoryProvider, MemorySchemaRegistry};
use std::sync::Arc;
let registry = Arc::new(MemorySchemaRegistry::new());
let bus = EventBus::with_schema_registry(MemoryProvider::default(), registry);Publishing
Convenience Publish
Creates an Event automatically with generated id and timestamp:
let event = bus.publish(
"orders", // category
"created", // topic → subject becomes "events.orders.created"
"New order ORD-001", // summary
"order-service", // source
serde_json::json!({ "order_id": "ORD-001", "total": 99.99 }),
).await?;
println!("Event ID: {}", event.id); // evt-<uuid>
println!("Subject: {}", event.subject); // events.orders.createdThe subject is built automatically using the provider's build_subject(category, topic) method.
Publish a Pre-Built Event
use a3s_event::Event;
let event = Event::new(
"events.orders.created",
"orders",
"New order ORD-001",
"order-service",
serde_json::json!({ "order_id": "ORD-001" }),
);
let sequence = bus.publish_event(&event).await?;Typed Events (with Schema Validation)
let event = Event::typed(
"events.orders.created", // subject
"orders", // category
"order.created", // event_type (matched against schema registry)
1, // version
"New order ORD-001", // summary
"order-service", // source
serde_json::json!({ "order_id": "ORD-001", "total": 99.99 }),
);
let sequence = bus.publish_event(&event).await?;Publish with Options
Provider-specific publish options (deduplication, optimistic concurrency, timeout):
use a3s_event::PublishOptions;
let opts = PublishOptions {
msg_id: Some("dedup-123".into()), // Deduplication ID
expected_sequence: Some(42), // Optimistic concurrency
timeout_secs: Some(5), // Publish timeout
};
let sequence = bus.publish_event_with_options(&event, &opts).await?;PublishOptions Fields
Prop
Type
Subscribing
A3S Event uses a two-step subscription model: register a filter, then create subscriber handles.
Step 1: Register a Subscription
use a3s_event::SubscriptionFilter;
bus.update_subscription(SubscriptionFilter {
subscriber_id: "order-processor".to_string(),
subjects: vec!["events.orders.>".to_string()],
durable: true,
options: None,
}).await?;Step 2: Create Subscriber Handles
let mut subs = bus.create_subscriber("order-processor").await?;
for sub in &mut subs {
// Auto-ack mode
while let Some(received) = sub.next().await? {
println!("{}: {}", received.event.subject, received.event.summary);
}
}Manual Ack Mode (NATS)
for sub in &mut subs {
while let Some(pending) = sub.next_manual_ack().await? {
match process(&pending.received.event).await {
Ok(_) => pending.ack().await?,
Err(_) => pending.nak().await?, // request redelivery
}
}
}Subscribe with Options
use a3s_event::{SubscribeOptions, DeliverPolicy, SubscriptionFilter};
bus.update_subscription(SubscriptionFilter {
subscriber_id: "replay-consumer".to_string(),
subjects: vec!["events.orders.>".to_string()],
durable: true,
options: Some(SubscribeOptions {
deliver_policy: DeliverPolicy::New,
max_deliver: Some(3),
ack_wait_secs: Some(30),
max_ack_pending: Some(100),
backoff_secs: vec![1, 5, 30],
}),
}).await?;SubscribeOptions Fields
Prop
Type
Delivery Policies
pub enum DeliverPolicy {
All, // All messages from the beginning
Last, // Last message only
New, // Only new messages after subscribe
ByStartSequence { sequence: u64 },// From a specific stream sequence
ByStartTime { timestamp: u64 }, // From a Unix timestamp (milliseconds)
LastPerSubject, // Last message per subject
}Subject Wildcards
Both providers support NATS-style wildcards:
Prop
Type
History & Querying
// Get recent events, optionally filtered by category
let events = bus.list_events(Some("orders"), 100).await?;
for event in &events {
println!("[{}] {} — {}", event.timestamp, event.subject, event.summary);
}
// Count events by category
let counts = bus.counts(1000).await?;
println!("Total: {}, Orders: {:?}", counts.total, counts.categories.get("orders"));Subscription Management
// List all registered subscriptions
let filters = bus.list_subscriptions().await;
// Get a specific subscription
let filter = bus.get_subscription("order-processor").await;
// Remove a subscription (also deletes the durable consumer)
bus.remove_subscription("order-processor").await?;Configuration Setters
All setters require &mut self and should be called during setup before publishing:
let mut bus = EventBus::new(MemoryProvider::default());
// Optional: schema validation
bus.set_dlq_handler(Arc::new(MemoryDlqHandler::new(100)));
// Optional: encryption (requires `encryption` feature)
bus.set_encryptor(Arc::new(encryptor));
// Optional: state persistence
bus.set_state_store(Arc::new(store))?;
// Optional: broker routing (requires `routing` feature)
bus.set_broker(Arc::new(broker));Health Check
let is_healthy = bus.health().await?;Prop
Type
Accessors
bus.provider_name(); // "memory" | "nats"
bus.provider(); // &dyn EventProvider
bus.provider_arc(); // Arc<dyn EventProvider>
bus.metrics(); // &EventMetrics
bus.schema_registry(); // Option<&dyn SchemaRegistry>
bus.dlq_handler(); // Option<&dyn DlqHandler>
bus.state_store(); // Option<&dyn StateStore>
bus.encryptor(); // Option<&dyn EventEncryptor> (encryption feature)
bus.broker(); // Option<&Broker> (routing feature)
bus.info().await?; // ProviderInfo