A3S Docs
A3S Event

EventBus

High-level API for publishing, subscribing, and managing events

EventBus

EventBus is the primary API for working with events. It wraps an EventProvider and adds schema validation, encryption, DLQ handling, state persistence, metrics, and optional broker routing.

Creating an EventBus

use a3s_event::{EventBus, MemoryProvider};

let bus = EventBus::new(MemoryProvider::default());

With schema validation:

use a3s_event::{EventBus, MemoryProvider, MemorySchemaRegistry};
use std::sync::Arc;

let registry = Arc::new(MemorySchemaRegistry::new());
let bus = EventBus::with_schema_registry(MemoryProvider::default(), registry);

Publishing

Convenience Publish

Creates an Event automatically with generated id and timestamp:

let event = bus.publish(
    "orders",           // category
    "created",          // topic → subject becomes "events.orders.created"
    "New order ORD-001", // summary
    "order-service",    // source
    serde_json::json!({ "order_id": "ORD-001", "total": 99.99 }),
).await?;

println!("Event ID: {}", event.id);       // evt-<uuid>
println!("Subject: {}", event.subject);    // events.orders.created

The subject is built automatically using the provider's build_subject(category, topic) method.

Publish a Pre-Built Event

use a3s_event::Event;

let event = Event::new(
    "events.orders.created",
    "orders",
    "New order ORD-001",
    "order-service",
    serde_json::json!({ "order_id": "ORD-001" }),
);

let sequence = bus.publish_event(&event).await?;

Typed Events (with Schema Validation)

let event = Event::typed(
    "events.orders.created",  // subject
    "orders",                 // category
    "order.created",          // event_type (matched against schema registry)
    1,                        // version
    "New order ORD-001",      // summary
    "order-service",          // source
    serde_json::json!({ "order_id": "ORD-001", "total": 99.99 }),
);

let sequence = bus.publish_event(&event).await?;

Publish with Options

Provider-specific publish options (deduplication, optimistic concurrency, timeout):

use a3s_event::PublishOptions;

let opts = PublishOptions {
    msg_id: Some("dedup-123".into()),     // Deduplication ID
    expected_sequence: Some(42),           // Optimistic concurrency
    timeout_secs: Some(5),                 // Publish timeout
};

let sequence = bus.publish_event_with_options(&event, &opts).await?;

PublishOptions Fields

Prop

Type

Subscribing

A3S Event uses a two-step subscription model: register a filter, then create subscriber handles.

Step 1: Register a Subscription

use a3s_event::SubscriptionFilter;

bus.update_subscription(SubscriptionFilter {
    subscriber_id: "order-processor".to_string(),
    subjects: vec!["events.orders.>".to_string()],
    durable: true,
    options: None,
}).await?;

Step 2: Create Subscriber Handles

let mut subs = bus.create_subscriber("order-processor").await?;

for sub in &mut subs {
    // Auto-ack mode
    while let Some(received) = sub.next().await? {
        println!("{}: {}", received.event.subject, received.event.summary);
    }
}

Manual Ack Mode (NATS)

for sub in &mut subs {
    while let Some(pending) = sub.next_manual_ack().await? {
        match process(&pending.received.event).await {
            Ok(_) => pending.ack().await?,
            Err(_) => pending.nak().await?,  // request redelivery
        }
    }
}

Subscribe with Options

use a3s_event::{SubscribeOptions, DeliverPolicy, SubscriptionFilter};

bus.update_subscription(SubscriptionFilter {
    subscriber_id: "replay-consumer".to_string(),
    subjects: vec!["events.orders.>".to_string()],
    durable: true,
    options: Some(SubscribeOptions {
        deliver_policy: DeliverPolicy::New,
        max_deliver: Some(3),
        ack_wait_secs: Some(30),
        max_ack_pending: Some(100),
        backoff_secs: vec![1, 5, 30],
    }),
}).await?;

SubscribeOptions Fields

Prop

Type

Delivery Policies

pub enum DeliverPolicy {
    All,                              // All messages from the beginning
    Last,                             // Last message only
    New,                              // Only new messages after subscribe
    ByStartSequence { sequence: u64 },// From a specific stream sequence
    ByStartTime { timestamp: u64 },   // From a Unix timestamp (milliseconds)
    LastPerSubject,                   // Last message per subject
}

Subject Wildcards

Both providers support NATS-style wildcards:

Prop

Type

History & Querying

// Get recent events, optionally filtered by category
let events = bus.list_events(Some("orders"), 100).await?;
for event in &events {
    println!("[{}] {} — {}", event.timestamp, event.subject, event.summary);
}

// Count events by category
let counts = bus.counts(1000).await?;
println!("Total: {}, Orders: {:?}", counts.total, counts.categories.get("orders"));

Subscription Management

// List all registered subscriptions
let filters = bus.list_subscriptions().await;

// Get a specific subscription
let filter = bus.get_subscription("order-processor").await;

// Remove a subscription (also deletes the durable consumer)
bus.remove_subscription("order-processor").await?;

Configuration Setters

All setters require &mut self and should be called during setup before publishing:

let mut bus = EventBus::new(MemoryProvider::default());

// Optional: schema validation
bus.set_dlq_handler(Arc::new(MemoryDlqHandler::new(100)));

// Optional: encryption (requires `encryption` feature)
bus.set_encryptor(Arc::new(encryptor));

// Optional: state persistence
bus.set_state_store(Arc::new(store))?;

// Optional: broker routing (requires `routing` feature)
bus.set_broker(Arc::new(broker));

Health Check

let is_healthy = bus.health().await?;

Prop

Type

Accessors

bus.provider_name();                // "memory" | "nats"
bus.provider();                     // &dyn EventProvider
bus.provider_arc();                 // Arc<dyn EventProvider>
bus.metrics();                      // &EventMetrics
bus.schema_registry();              // Option<&dyn SchemaRegistry>
bus.dlq_handler();                  // Option<&dyn DlqHandler>
bus.state_store();                  // Option<&dyn StateStore>
bus.encryptor();                    // Option<&dyn EventEncryptor> (encryption feature)
bus.broker();                       // Option<&Broker> (routing feature)
bus.info().await?;                  // ProviderInfo

On this page