A3S Event
State Persistence
Save and restore subscription filters across restarts
State Persistence
A3S Event can persist subscription filters to disk so that subscriptions are automatically restored after a process restart.
Enable State Persistence
File-Based (Production)
use a3s_event::{EventBus, MemoryProvider, FileStateStore};
use std::sync::Arc;
let mut bus = EventBus::new(MemoryProvider::default());
let store = FileStateStore::new("./state/event-bus.json");
bus.set_state_store(Arc::new(store))?;set_state_store() immediately loads any previously saved subscriptions and re-registers them.
Memory-Based (Testing)
use a3s_event::MemoryStateStore;
let store = MemoryStateStore::new();
bus.set_state_store(Arc::new(store))?;How It Works
Process start → set_state_store() → load saved filters → re-register subscriptions → ready
↓
Process running → update_subscription() / remove_subscription() → auto-save to store- On
set_state_store()— Loads saved subscriptions from the store and populates the in-memory map - On
update_subscription()— Saves the full subscription map to the store - On
remove_subscription()— Saves the updated map to the store
StateStore Trait
Implement custom state backends:
use a3s_event::{StateStore, SubscriptionFilter};
use a3s_event::Result;
use std::collections::HashMap;
pub trait StateStore: Send + Sync {
/// Save all subscription filters (keyed by subscriber_id)
fn save(&self, subscriptions: &HashMap<String, SubscriptionFilter>) -> Result<()>;
/// Load all subscription filters
fn load(&self) -> Result<HashMap<String, SubscriptionFilter>>;
}Custom Example: Redis State Store
struct RedisStateStore {
client: redis::Client,
key: String,
}
impl StateStore for RedisStateStore {
fn save(&self, subscriptions: &HashMap<String, SubscriptionFilter>) -> Result<()> {
let json = serde_json::to_string(subscriptions)?;
// redis SET (blocking — use in sync context or wrap with block_on)
self.client.set(&self.key, &json)?;
Ok(())
}
fn load(&self) -> Result<HashMap<String, SubscriptionFilter>> {
let json: Option<String> = self.client.get(&self.key)?;
match json {
Some(data) => Ok(serde_json::from_str(&data)?),
None => Ok(HashMap::new()),
}
}
}FileStateStore Details
- Format: Pretty-printed JSON (human-readable)
- Atomicity: Writes to a
.tmpfile then renames (prevents partial writes on crash) - Directory: Auto-creates parent directories if they don't exist
- Path: Configurable, e.g.,
./state/event-bus.json
Example persisted state:
{
"order-processor": {
"subscriberId": "order-processor",
"subjects": ["events.orders.>"],
"durable": true
},
"analytics": {
"subscriberId": "analytics",
"subjects": ["events.>"],
"durable": false
}
}Accessors
// Check if a state store is configured
if let Some(store) = bus.state_store() {
let filters = store.load()?;
println!("{} persisted subscriptions", filters.len());
}