A3S Docs
A3S Event

Quick Start

Get started with A3S Event — publish, subscribe, and query events

Quick Start

Installation

[dependencies]
a3s-event = "0.3"
tokio = { version = "1", features = ["full"] }
serde_json = "1"

In-Memory (Development)

use a3s_event::{EventBus, Event, SubscriptionFilter};
use a3s_event::provider::memory::MemoryProvider;

#[tokio::main]
async fn main() -> a3s_event::Result<()> {
    let bus = EventBus::new(MemoryProvider::default());

    // Publish an event
    let event = bus.publish(
        "orders",                                          // category
        "created",                                         // topic
        "New order ORD-001",                               // summary
        "order-service",                                   // source
        serde_json::json!({ "order_id": "ORD-001", "total": 99.99 }),
    ).await?;

    println!("Published: {}", event.id);

    // Query history
    let events = bus.list_events(Some("orders"), 100).await?;
    println!("Order events: {}", events.len());

    Ok(())
}

NATS JetStream (Production)

Requires the nats feature (enabled by default).

use a3s_event::{EventBus, NatsProvider, NatsConfig, SubscriptionFilter};

let provider = NatsProvider::connect(NatsConfig {
    url: "nats://localhost:4222".to_string(),
    stream_name: "A3S_EVENTS".to_string(),
    ..Default::default()
}).await?;

let bus = EventBus::new(provider);

// Register a durable subscription
bus.update_subscription(SubscriptionFilter {
    subscriber_id: "order-processor".to_string(),
    subjects: vec!["events.orders.>".to_string()],
    durable: true,
    options: None,
}).await?;

// Create subscriber handles
let mut subs = bus.create_subscriber("order-processor").await?;

// Publish
bus.publish(
    "orders", "created",
    "New order ORD-001", "order-service",
    serde_json::json!({ "order_id": "ORD-001" }),
).await?;

// Receive events
for sub in &mut subs {
    if let Some(received) = sub.next().await? {
        println!("{}: {}", received.event.subject, received.event.summary);
    }
}

With All Features

use a3s_event::*;
use std::sync::Arc;

let mut bus = EventBus::new(MemoryProvider::default());

// Schema validation
let mut registry = MemorySchemaRegistry::new();
registry.register(EventSchema {
    event_type: "order.created".into(),
    version: 1,
    required_fields: vec!["order_id".into(), "total".into()],
    description: "Order creation event".into(),
})?;
let bus = EventBus::with_schema_registry(MemoryProvider::default(), Arc::new(registry));

// Encryption (requires `encryption` feature)
let mut bus = bus;  // need mut for setters
let encryptor = Aes256GcmEncryptor::new("key-v1", b"0123456789abcdef0123456789abcdef");
bus.set_encryptor(Arc::new(encryptor));

// Dead letter queue
let dlq = MemoryDlqHandler::new(100);
bus.set_dlq_handler(Arc::new(dlq));

// State persistence
let store = FileStateStore::new("./state/event-bus.json")?;
bus.set_state_store(Arc::new(store))?;

// Publish a typed event (validated against schema)
let event = Event::typed(
    "events.orders.created",
    "orders",
    "order.created",
    1,
    "New order ORD-001",
    "order-service",
    serde_json::json!({ "order_id": "ORD-001", "total": 99.99 }),
);
let seq = bus.publish_event(&event).await?;

// Check metrics
let snapshot = bus.metrics().snapshot();
println!("Published: {}, Errors: {}", snapshot.publish_count, snapshot.publish_errors);

// Health check
let healthy = bus.health().await?;

On this page