A3S Event
Event Routing
Knative-inspired Broker/Trigger pattern for event filtering and delivery
Event Routing
Requires the routing feature (enabled by default).
A3S Event includes a Knative-inspired Broker/Trigger pattern for declarative event routing. Define triggers with filters, and the broker automatically routes matching events to their sinks.
Concepts
- Broker — Receives events and evaluates them against registered triggers
- Trigger — A filter + sink pair. When an event matches the filter, it's delivered to the sink
- EventSink — A delivery target (topic, in-process handler, logger, etc.)
Creating a Broker
use a3s_event::broker::Broker;
let mut broker = Broker::new();Defining Triggers
use a3s_event::broker::{Trigger, TriggerFilter};
use a3s_event::sink::{LogSink, InProcessSink, TopicSink};
use std::sync::Arc;
// Route all order events to a log sink
broker.add_trigger(Trigger::new(
"log-orders",
TriggerFilter::by_category("orders"),
Arc::new(LogSink::default()),
));
// Route specific event types to an in-process handler
broker.add_trigger(Trigger::new(
"process-payments",
TriggerFilter::by_event_type("payment.completed"),
Arc::new(InProcessSink::new("payment-handler", |event| async move {
println!("Payment: {}", event.payload);
Ok(())
})),
));Trigger Filters
pub struct TriggerFilter {
pub category: Option<String>,
pub event_type: Option<String>,
pub subject_prefix: Option<String>,
}Prop
Type
Event Sinks
Built-in Sinks
Prop
Type
Custom Sinks
use a3s_event::sink::EventSink;
use a3s_event::Event;
use async_trait::async_trait;
struct WebhookSink {
url: String,
name: String,
}
#[async_trait]
impl EventSink for WebhookSink {
async fn deliver(&self, event: &Event) -> Result<()> {
reqwest::Client::new()
.post(&self.url)
.json(event)
.send()
.await?;
Ok(())
}
fn name(&self) -> &str {
&self.name
}
}Routing Events
let event = Event::new(
"events.orders.created", "orders",
"New order", "order-service",
serde_json::json!({"order_id": "ORD-001"}),
);
// Route through broker — delivers to all matching triggers
broker.route(&event).await?;Integration with EventBus
When a broker is set on the EventBus, published events are automatically routed:
let mut bus = EventBus::new(MemoryProvider::default());
bus.set_broker(broker);
// This publish will also route through the broker's triggers
bus.publish("orders", "created", "New order", "src", payload).await?;SinkDlqHandler
A DLQ handler that forwards dead letters through an EventSink:
use a3s_event::dlq::SinkDlqHandler;
use a3s_event::sink::TopicSink;
let dlq_sink = TopicSink::new("dlq-topic", provider.clone());
let dlq = SinkDlqHandler::new(Arc::new(dlq_sink), 100);
bus.set_dlq_handler(Arc::new(dlq));