A3S Docs
A3S Event

State Persistence

Save and restore subscription filters across restarts

State Persistence

A3S Event can persist subscription filters to disk so that subscriptions are automatically restored after a process restart.

Enable State Persistence

File-Based (Production)

use a3s_event::{EventBus, MemoryProvider, FileStateStore};
use std::sync::Arc;

let mut bus = EventBus::new(MemoryProvider::default());

let store = FileStateStore::new("./state/event-bus.json");
bus.set_state_store(Arc::new(store))?;

set_state_store() immediately loads any previously saved subscriptions and re-registers them.

Memory-Based (Testing)

use a3s_event::MemoryStateStore;

let store = MemoryStateStore::new();
bus.set_state_store(Arc::new(store))?;

How It Works

Process start → set_state_store() → load saved filters → re-register subscriptions → ready

Process running → update_subscription() / remove_subscription() → auto-save to store
  1. On set_state_store() — Loads saved subscriptions from the store and populates the in-memory map
  2. On update_subscription() — Saves the full subscription map to the store
  3. On remove_subscription() — Saves the updated map to the store

StateStore Trait

Implement custom state backends:

use a3s_event::{StateStore, SubscriptionFilter};
use a3s_event::Result;
use std::collections::HashMap;

pub trait StateStore: Send + Sync {
    /// Save all subscription filters (keyed by subscriber_id)
    fn save(&self, subscriptions: &HashMap<String, SubscriptionFilter>) -> Result<()>;

    /// Load all subscription filters
    fn load(&self) -> Result<HashMap<String, SubscriptionFilter>>;
}

Custom Example: Redis State Store

struct RedisStateStore {
    client: redis::Client,
    key: String,
}

impl StateStore for RedisStateStore {
    fn save(&self, subscriptions: &HashMap<String, SubscriptionFilter>) -> Result<()> {
        let json = serde_json::to_string(subscriptions)?;
        // redis SET (blocking — use in sync context or wrap with block_on)
        self.client.set(&self.key, &json)?;
        Ok(())
    }

    fn load(&self) -> Result<HashMap<String, SubscriptionFilter>> {
        let json: Option<String> = self.client.get(&self.key)?;
        match json {
            Some(data) => Ok(serde_json::from_str(&data)?),
            None => Ok(HashMap::new()),
        }
    }
}

FileStateStore Details

  • Format: Pretty-printed JSON (human-readable)
  • Atomicity: Writes to a .tmp file then renames (prevents partial writes on crash)
  • Directory: Auto-creates parent directories if they don't exist
  • Path: Configurable, e.g., ./state/event-bus.json

Example persisted state:

{
  "order-processor": {
    "subscriberId": "order-processor",
    "subjects": ["events.orders.>"],
    "durable": true
  },
  "analytics": {
    "subscriberId": "analytics",
    "subjects": ["events.>"],
    "durable": false
  }
}

Accessors

// Check if a state store is configured
if let Some(store) = bus.state_store() {
    let filters = store.load()?;
    println!("{} persisted subscriptions", filters.len());
}

On this page