A3S Docs
A3S Event

Event Routing

Knative-inspired Broker/Trigger pattern for event filtering and delivery

Event Routing

Requires the routing feature (enabled by default).

A3S Event includes a Knative-inspired Broker/Trigger pattern for declarative event routing. Define triggers with filters, and the broker automatically routes matching events to their sinks.

Concepts

  • Broker — Receives events and evaluates them against registered triggers
  • Trigger — A filter + sink pair. When an event matches the filter, it's delivered to the sink
  • EventSink — A delivery target (topic, in-process handler, logger, etc.)

Creating a Broker

use a3s_event::broker::Broker;

let mut broker = Broker::new();

Defining Triggers

use a3s_event::broker::{Trigger, TriggerFilter};
use a3s_event::sink::{LogSink, InProcessSink, TopicSink};
use std::sync::Arc;

// Route all order events to a log sink
broker.add_trigger(Trigger::new(
    "log-orders",
    TriggerFilter::by_category("orders"),
    Arc::new(LogSink::default()),
));

// Route specific event types to an in-process handler
broker.add_trigger(Trigger::new(
    "process-payments",
    TriggerFilter::by_event_type("payment.completed"),
    Arc::new(InProcessSink::new("payment-handler", |event| async move {
        println!("Payment: {}", event.payload);
        Ok(())
    })),
));

Trigger Filters

pub struct TriggerFilter {
    pub category: Option<String>,
    pub event_type: Option<String>,
    pub subject_prefix: Option<String>,
}

Prop

Type

Event Sinks

Built-in Sinks

Prop

Type

Custom Sinks

use a3s_event::sink::EventSink;
use a3s_event::Event;
use async_trait::async_trait;

struct WebhookSink {
    url: String,
    name: String,
}

#[async_trait]
impl EventSink for WebhookSink {
    async fn deliver(&self, event: &Event) -> Result<()> {
        reqwest::Client::new()
            .post(&self.url)
            .json(event)
            .send()
            .await?;
        Ok(())
    }

    fn name(&self) -> &str {
        &self.name
    }
}

Routing Events

let event = Event::new(
    "events.orders.created", "orders",
    "New order", "order-service",
    serde_json::json!({"order_id": "ORD-001"}),
);

// Route through broker — delivers to all matching triggers
broker.route(&event).await?;

Integration with EventBus

When a broker is set on the EventBus, published events are automatically routed:

let mut bus = EventBus::new(MemoryProvider::default());
bus.set_broker(broker);

// This publish will also route through the broker's triggers
bus.publish("orders", "created", "New order", "src", payload).await?;

SinkDlqHandler

A DLQ handler that forwards dead letters through an EventSink:

use a3s_event::dlq::SinkDlqHandler;
use a3s_event::sink::TopicSink;

let dlq_sink = TopicSink::new("dlq-topic", provider.clone());
let dlq = SinkDlqHandler::new(Arc::new(dlq_sink), 100);
bus.set_dlq_handler(Arc::new(dlq));

On this page