A3S Docs
A3S Event

Providers

MemoryProvider for development and NatsProvider for production

Providers

A3S Event uses the EventProvider trait to abstract the underlying messaging system. Two implementations are included: MemoryProvider for development and NatsProvider for production.

EventProvider Trait

#[async_trait]
pub trait EventProvider: Send + Sync {
    /// Publish an event, returning the provider-assigned sequence number
    async fn publish(&self, event: &Event) -> Result<u64>;

    /// Create a durable subscription (survives reconnects)
    async fn subscribe_durable(
        &self, consumer_name: &str, filter_subject: &str,
    ) -> Result<Box<dyn Subscription>>;

    /// Create an ephemeral subscription
    async fn subscribe(&self, filter_subject: &str) -> Result<Box<dyn Subscription>>;

    /// Fetch historical events
    async fn history(&self, filter_subject: Option<&str>, limit: usize) -> Result<Vec<Event>>;

    /// Delete a durable subscription by consumer name
    async fn unsubscribe(&self, consumer_name: &str) -> Result<()>;

    /// Get provider info (message count, bytes, consumers)
    async fn info(&self) -> Result<ProviderInfo>;

    /// Subject prefix for this provider (e.g., "events")
    fn subject_prefix(&self) -> &str;

    /// Provider name (e.g., "nats", "memory")
    fn name(&self) -> &str;

    // Default implementations (override for provider-specific behavior):
    fn build_subject(&self, category: &str, topic: &str) -> String { ... }
    fn category_subject(&self, category: &str) -> String { ... }
    async fn publish_with_options(&self, event: &Event, opts: &PublishOptions) -> Result<u64> { ... }
    async fn subscribe_durable_with_options(...) -> Result<Box<dyn Subscription>> { ... }
    async fn subscribe_with_options(...) -> Result<Box<dyn Subscription>> { ... }
    async fn health(&self) -> Result<bool> { ... }
}

MemoryProvider

In-process event bus using tokio::sync::broadcast. Zero external dependencies.

use a3s_event::MemoryProvider;

// Default configuration
let provider = MemoryProvider::default();

With custom configuration:

use a3s_event::provider::memory::{MemoryProvider, MemoryConfig};

let provider = MemoryProvider::new(MemoryConfig {
    subject_prefix: "events".to_string(),
    max_events: 100_000,
    channel_capacity: 10_000,
});

MemoryConfig Fields

Prop

Type

Characteristics

  • Events stored in an in-memory buffer (configurable size)
  • Broadcast channel for pub/sub
  • Wildcard matching (>, *) implemented in Rust
  • No durable subscriptions (falls back to ephemeral)
  • Publish/subscribe options accepted but ignored
  • Health check always returns true
  • Ideal for unit tests and local development

NatsProvider

Requires the nats feature (enabled by default).

Production-grade provider using NATS JetStream.

use a3s_event::{NatsProvider, NatsConfig, StorageType};

let provider = NatsProvider::connect(NatsConfig {
    url: "nats://localhost:4222".to_string(),
    stream_name: "A3S_EVENTS".to_string(),
    subject_prefix: "events".to_string(),
    storage: StorageType::File,
    max_events: 100_000,
    max_age_secs: 604_800,  // 7 days
    ..Default::default()
}).await?;

NatsConfig Fields

Prop

Type

NATS-Specific Features

Deduplication:

use a3s_event::PublishOptions;

let opts = PublishOptions {
    msg_id: Some("order-001".into()),
    ..Default::default()
};
bus.publish_event_with_options(&event, &opts).await?;
// Duplicate publishes with the same msg_id are ignored within the dedup window

Manual Acknowledgement:

let mut subs = bus.create_subscriber("order-processor").await?;
for sub in &mut subs {
    while let Some(pending) = sub.next_manual_ack().await? {
        match process(&pending.received.event).await {
            Ok(()) => pending.ack().await?,
            Err(_) => pending.nak().await?,  // triggers redelivery
        }
    }
}

Delivery Policies:

use a3s_event::{SubscribeOptions, DeliverPolicy, SubscriptionFilter};

bus.update_subscription(SubscriptionFilter {
    subscriber_id: "replay".to_string(),
    subjects: vec!["events.orders.>".to_string()],
    durable: true,
    options: Some(SubscribeOptions {
        deliver_policy: DeliverPolicy::ByStartSequence { sequence: 100 },
        ..Default::default()
    }),
}).await?;

Custom Provider

Implement EventProvider for any messaging backend:

use a3s_event::provider::{EventProvider, Subscription, ProviderInfo};
use a3s_event::types::Event;
use a3s_event::Result;
use async_trait::async_trait;

pub struct RedisProvider { /* ... */ }

#[async_trait]
impl EventProvider for RedisProvider {
    async fn publish(&self, event: &Event) -> Result<u64> { todo!() }

    async fn subscribe_durable(
        &self, consumer_name: &str, filter_subject: &str,
    ) -> Result<Box<dyn Subscription>> { todo!() }

    async fn subscribe(&self, filter_subject: &str) -> Result<Box<dyn Subscription>> { todo!() }

    async fn history(
        &self, filter_subject: Option<&str>, limit: usize,
    ) -> Result<Vec<Event>> { todo!() }

    async fn unsubscribe(&self, consumer_name: &str) -> Result<()> { todo!() }
    async fn info(&self) -> Result<ProviderInfo> { todo!() }

    // Only subject_prefix() and name() are required.
    // build_subject() and category_subject() have default implementations.
    fn subject_prefix(&self) -> &str { "events" }
    fn name(&self) -> &str { "redis" }
}

Then use it like any other provider:

let bus = EventBus::new(RedisProvider::new(config));
bus.publish("orders", "created", "New order", "src", payload).await?;

Responsibility Boundaries

These capabilities are delegated to the provider, not the EventBus:

Prop

Type

On this page