A3S Event
Providers
MemoryProvider for development and NatsProvider for production
Providers
A3S Event uses the EventProvider trait to abstract the underlying messaging system. Two implementations are included: MemoryProvider for development and NatsProvider for production.
EventProvider Trait
#[async_trait]
pub trait EventProvider: Send + Sync {
/// Publish an event, returning the provider-assigned sequence number
async fn publish(&self, event: &Event) -> Result<u64>;
/// Create a durable subscription (survives reconnects)
async fn subscribe_durable(
&self, consumer_name: &str, filter_subject: &str,
) -> Result<Box<dyn Subscription>>;
/// Create an ephemeral subscription
async fn subscribe(&self, filter_subject: &str) -> Result<Box<dyn Subscription>>;
/// Fetch historical events
async fn history(&self, filter_subject: Option<&str>, limit: usize) -> Result<Vec<Event>>;
/// Delete a durable subscription by consumer name
async fn unsubscribe(&self, consumer_name: &str) -> Result<()>;
/// Get provider info (message count, bytes, consumers)
async fn info(&self) -> Result<ProviderInfo>;
/// Subject prefix for this provider (e.g., "events")
fn subject_prefix(&self) -> &str;
/// Provider name (e.g., "nats", "memory")
fn name(&self) -> &str;
// Default implementations (override for provider-specific behavior):
fn build_subject(&self, category: &str, topic: &str) -> String { ... }
fn category_subject(&self, category: &str) -> String { ... }
async fn publish_with_options(&self, event: &Event, opts: &PublishOptions) -> Result<u64> { ... }
async fn subscribe_durable_with_options(...) -> Result<Box<dyn Subscription>> { ... }
async fn subscribe_with_options(...) -> Result<Box<dyn Subscription>> { ... }
async fn health(&self) -> Result<bool> { ... }
}MemoryProvider
In-process event bus using tokio::sync::broadcast. Zero external dependencies.
use a3s_event::MemoryProvider;
// Default configuration
let provider = MemoryProvider::default();With custom configuration:
use a3s_event::provider::memory::{MemoryProvider, MemoryConfig};
let provider = MemoryProvider::new(MemoryConfig {
subject_prefix: "events".to_string(),
max_events: 100_000,
channel_capacity: 10_000,
});MemoryConfig Fields
Prop
Type
Characteristics
- Events stored in an in-memory buffer (configurable size)
- Broadcast channel for pub/sub
- Wildcard matching (
>,*) implemented in Rust - No durable subscriptions (falls back to ephemeral)
- Publish/subscribe options accepted but ignored
- Health check always returns
true - Ideal for unit tests and local development
NatsProvider
Requires the nats feature (enabled by default).
Production-grade provider using NATS JetStream.
use a3s_event::{NatsProvider, NatsConfig, StorageType};
let provider = NatsProvider::connect(NatsConfig {
url: "nats://localhost:4222".to_string(),
stream_name: "A3S_EVENTS".to_string(),
subject_prefix: "events".to_string(),
storage: StorageType::File,
max_events: 100_000,
max_age_secs: 604_800, // 7 days
..Default::default()
}).await?;NatsConfig Fields
Prop
Type
NATS-Specific Features
Deduplication:
use a3s_event::PublishOptions;
let opts = PublishOptions {
msg_id: Some("order-001".into()),
..Default::default()
};
bus.publish_event_with_options(&event, &opts).await?;
// Duplicate publishes with the same msg_id are ignored within the dedup windowManual Acknowledgement:
let mut subs = bus.create_subscriber("order-processor").await?;
for sub in &mut subs {
while let Some(pending) = sub.next_manual_ack().await? {
match process(&pending.received.event).await {
Ok(()) => pending.ack().await?,
Err(_) => pending.nak().await?, // triggers redelivery
}
}
}Delivery Policies:
use a3s_event::{SubscribeOptions, DeliverPolicy, SubscriptionFilter};
bus.update_subscription(SubscriptionFilter {
subscriber_id: "replay".to_string(),
subjects: vec!["events.orders.>".to_string()],
durable: true,
options: Some(SubscribeOptions {
deliver_policy: DeliverPolicy::ByStartSequence { sequence: 100 },
..Default::default()
}),
}).await?;Custom Provider
Implement EventProvider for any messaging backend:
use a3s_event::provider::{EventProvider, Subscription, ProviderInfo};
use a3s_event::types::Event;
use a3s_event::Result;
use async_trait::async_trait;
pub struct RedisProvider { /* ... */ }
#[async_trait]
impl EventProvider for RedisProvider {
async fn publish(&self, event: &Event) -> Result<u64> { todo!() }
async fn subscribe_durable(
&self, consumer_name: &str, filter_subject: &str,
) -> Result<Box<dyn Subscription>> { todo!() }
async fn subscribe(&self, filter_subject: &str) -> Result<Box<dyn Subscription>> { todo!() }
async fn history(
&self, filter_subject: Option<&str>, limit: usize,
) -> Result<Vec<Event>> { todo!() }
async fn unsubscribe(&self, consumer_name: &str) -> Result<()> { todo!() }
async fn info(&self) -> Result<ProviderInfo> { todo!() }
// Only subject_prefix() and name() are required.
// build_subject() and category_subject() have default implementations.
fn subject_prefix(&self) -> &str { "events" }
fn name(&self) -> &str { "redis" }
}Then use it like any other provider:
let bus = EventBus::new(RedisProvider::new(config));
bus.publish("orders", "created", "New order", "src", payload).await?;Responsibility Boundaries
These capabilities are delegated to the provider, not the EventBus:
Prop
Type