Event-Driven Architecture
In traditional request-driven architectures, services communicate through synchronous calls — Service A calls Service B, waits for the response, then proceeds. This creates temporal coupling (both must be available simultaneously), behavioral coupling (caller knows the callee's API), and cascading failures (if B is down, A fails too).
Event-driven architecture (EDA) flips this model. Instead of telling other services what to do, a service announces what happened. Interested services react independently. The producer doesn't know — or care — who consumes the event. This fundamental inversion enables loose coupling, independent scalability, and resilient systems that degrade gracefully.
- Event — something that happened (past tense):
OrderPlaced,PaymentReceived,InventoryReserved - Command — a request to do something (imperative):
PlaceOrder,ChargePayment,ReserveInventory - Query — a request to read something:
GetOrderStatus,ListProducts
Three Event Patterns
Martin Fowler identifies three distinct patterns for using events in distributed systems. Each has different trade-offs around coupling, latency, and data freshness.
1. Event Notification
The simplest pattern: the event is a thin notification that something happened. It carries minimal data — just enough for the consumer to decide whether to act, then fetch details from the source.
// Event Notification — thin payload
{
"eventType": "OrderPlaced",
"eventId": "evt-a1b2c3d4",
"timestamp": "2026-04-15T10:30:00Z",
"aggregateId": "order-98765",
"aggregateType": "Order"
// No order details — consumer must call Order Service API
}
// Consumer logic:
async function handleOrderPlaced(event) {
// Must call back to the source to get full data
const order = await orderServiceClient.getOrder(event.aggregateId);
await inventoryService.reserve(order.items);
}
| Pros | Cons |
|---|---|
| Low coupling — event schema is tiny | Runtime coupling — consumer must call the source |
| Easy to evolve — add new consumers without changing the event | Higher latency — extra network round-trip |
| Small event size | Source must handle increased read load from all consumers |
2. Event-Carried State Transfer (ECST)
The event carries all the data the consumer needs. No callback to the source required. Each consumer maintains its own local copy of the data it needs, updated by processing events.
// Event-Carried State Transfer — fat payload
{
"eventType": "OrderPlaced",
"eventId": "evt-a1b2c3d4",
"timestamp": "2026-04-15T10:30:00Z",
"aggregateId": "order-98765",
"data": {
"customerId": "cust-11223",
"customerEmail": "alice@example.com",
"customerTier": "premium",
"items": [
{
"productId": "prod-555",
"productName": "Wireless Headphones",
"sku": "WH-PRO-BLK",
"quantity": 2,
"unitPrice": 79.99
},
{
"productId": "prod-888",
"productName": "USB-C Cable",
"sku": "USB-C-2M",
"quantity": 1,
"unitPrice": 12.99
}
],
"shippingAddress": {
"street": "123 Main St",
"city": "Seattle",
"state": "WA",
"zip": "98101"
},
"totalAmount": 172.97,
"currency": "USD",
"paymentMethod": "credit_card"
}
}
// Consumer logic — completely self-contained:
async function handleOrderPlaced(event) {
// No API calls needed — all data is in the event
const { items, totalAmount, customerId } = event.data;
for (const item of items) {
await localInventoryDb.decrement(item.sku, item.quantity);
}
await localOrderDb.insert({
orderId: event.aggregateId,
customerId,
total: totalAmount,
status: 'pending_payment'
});
}
| Pros | Cons |
|---|---|
| Full autonomy — consumer never calls back | Larger events — more bandwidth |
| Lower latency — no extra round-trips | Data duplication across consumers |
| Resilient — works even if source is down | Eventual consistency — stale local copies |
3. Event Sourcing
Instead of storing current state, store the sequence of events that produced it. The event log is the source of truth. Current state is derived by replaying events.
// Event Sourcing — the event log IS the database
// Event Store for Order aggregate (order-98765):
┌─────┬───────────────────┬────────────────────────────────────┬──────────┐
│ Seq │ Event Type │ Data │ Timestamp│
├─────┼───────────────────┼────────────────────────────────────┼──────────┤
│ 1 │ OrderCreated │ {customerId:"cust-11223",items:[…]}│ 10:30:00 │
│ 2 │ PaymentAuthorized │ {paymentId:"pay-xyz",amount:172.97}│ 10:30:05 │
│ 3 │ InventoryReserved │ {warehouse:"SEA-1",items:[…]} │ 10:30:08 │
│ 4 │ OrderShipped │ {trackingId:"1Z999...",carrier:UPS}│ 10:45:00 │
│ 5 │ OrderDelivered │ {signature:"Alice",deliveredAt:...}│ 14:20:00 │
└─────┴───────────────────┴────────────────────────────────────┴──────────┘
// Rebuild current state by replaying events:
function rebuildOrder(events) {
let state = { status: 'unknown', items: [], payments: [] };
for (const event of events) {
switch (event.type) {
case 'OrderCreated':
state = {
...state,
orderId: event.aggregateId,
customerId: event.data.customerId,
items: event.data.items,
status: 'created'
};
break;
case 'PaymentAuthorized':
state = {
...state,
payments: [...state.payments, event.data],
status: 'paid'
};
break;
case 'InventoryReserved':
state = { ...state, warehouse: event.data.warehouse, status: 'reserved' };
break;
case 'OrderShipped':
state = { ...state, trackingId: event.data.trackingId, status: 'shipped' };
break;
case 'OrderDelivered':
state = { ...state, status: 'delivered', deliveredAt: event.data.deliveredAt };
break;
}
}
return state;
}
// Snapshots for performance (don't replay all events every time):
// Every 100 events, save a snapshot. Replay from last snapshot.
┌──────────────┬───────────────────────────┐
│ Snapshot #5 │ State at event seq 500 │
├──────────────┤ {status:'shipped',...} │
│ + Events │ seq 501, 502, 503... │
│ = Current │ Replay only 3 events │
└──────────────┴───────────────────────────┘
- Complete audit trail — every state change is recorded forever
- Time travel — rebuild state at any point in time
- Debug production issues — replay events to reproduce bugs exactly
- New projections — add a new read model by replaying the entire event log
- Regulatory compliance — financial systems, healthcare records
Event Brokers & Buses
An event broker is the infrastructure that routes events from producers to consumers. The choice of broker fundamentally shapes your architecture's capabilities.
Apache Kafka
The dominant choice for high-throughput, durable event streaming. Kafka is a distributed commit log — events are appended to partitioned topics and retained for a configurable duration (or forever with compaction).
// Kafka Topic Configuration for Order Events
// orders-topic: 12 partitions, replication factor 3
// Producer — Java (Spring Boot)
@Service
public class OrderEventProducer {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
public CompletableFuture<SendResult> publishOrderPlaced(Order order) {
OrderEvent event = OrderEvent.builder()
.eventId(UUID.randomUUID().toString())
.eventType("OrderPlaced")
.aggregateId(order.getId())
.timestamp(Instant.now())
.data(OrderPlacedData.builder()
.customerId(order.getCustomerId())
.items(order.getItems())
.totalAmount(order.getTotalAmount())
.shippingAddress(order.getShippingAddress())
.build())
.build();
// Key = orderId → ensures all events for same order
// go to same partition → preserves ordering per order
return kafkaTemplate.send(
"orders-topic",
order.getId(), // partition key
event // value (serialized with Avro)
);
}
}
// Consumer — Java (Spring Boot)
@Service
public class PaymentEventConsumer {
@KafkaListener(
topics = "orders-topic",
groupId = "payment-service",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderPlaced(
@Payload OrderEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment ack
) {
if (!"OrderPlaced".equals(event.getEventType())) return;
try {
paymentService.authorizePayment(
event.getAggregateId(),
event.getData().getTotalAmount(),
event.getData().getCustomerId()
);
ack.acknowledge(); // Manual commit after success
} catch (RetryableException e) {
// Don't ack — will be redelivered
throw e;
}
}
}
# Kafka Topic Configuration (CLI)
kafka-topics.sh --create \
--topic orders-topic \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \ # 7 days retention
--config cleanup.policy=delete \
--config min.insync.replicas=2 \
--config compression.type=lz4 \
--bootstrap-server kafka:9092
# Consumer Group Configuration
kafka-consumer-groups.sh --describe \
--group payment-service \
--bootstrap-server kafka:9092
# Output:
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# orders-topic 0 15234 15240 6
# orders-topic 1 14890 14890 0
# orders-topic 2 15102 15105 3
# ...
# Key Kafka Concepts:
# ┌─────────────────────────────────────────────────────────────┐
# │ Topic: orders-topic (12 partitions) │
# │ │
# │ Partition 0: [evt1] [evt4] [evt7] [evt10] → offset 15240 │
# │ Partition 1: [evt2] [evt5] [evt8] [evt11] → offset 14890 │
# │ Partition 2: [evt3] [evt6] [evt9] [evt12] → offset 15105 │
# │ ... │
# │ │
# │ Consumer Group: payment-service (3 instances) │
# │ Instance A → Partitions 0,1,2,3 │
# │ Instance B → Partitions 4,5,6,7 │
# │ Instance C → Partitions 8,9,10,11 │
# │ │
# │ Consumer Group: inventory-service (2 instances) │
# │ Instance X → Partitions 0,1,2,3,4,5 │
# │ Instance Y → Partitions 6,7,8,9,10,11 │
# │ │
# │ Each group gets ALL events. Within a group, each partition │
# │ is consumed by exactly ONE instance → parallel processing. │
# └─────────────────────────────────────────────────────────────┘
RabbitMQ
A traditional message broker with sophisticated routing. Uses exchanges, queues, and bindings. Better for complex routing patterns, lower latency per message, and task queues.
# RabbitMQ — Exchange/Queue topology for order events
# Python with pika library
import pika, json, uuid
from datetime import datetime
# Publisher
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
channel = connection.channel()
# Declare a topic exchange for order events
channel.exchange_declare(
exchange='order-events',
exchange_type='topic',
durable=True
)
# Publish event with routing key
event = {
'eventId': str(uuid.uuid4()),
'eventType': 'OrderPlaced',
'aggregateId': 'order-98765',
'timestamp': datetime.utcnow().isoformat(),
'data': {
'customerId': 'cust-11223',
'totalAmount': 172.97,
'items': [{'sku': 'WH-PRO-BLK', 'qty': 2}]
}
}
channel.basic_publish(
exchange='order-events',
routing_key='order.placed', # Routing key pattern
body=json.dumps(event),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent message
content_type='application/json',
message_id=event['eventId']
)
)
# ─── Consumer: Payment Service ───
# Binds to order.placed and order.cancelled
channel.queue_declare(queue='payment-service-queue', durable=True)
channel.queue_bind(
exchange='order-events',
queue='payment-service-queue',
routing_key='order.placed' # Only order.placed events
)
channel.queue_bind(
exchange='order-events',
queue='payment-service-queue',
routing_key='order.cancelled' # Also order.cancelled events
)
# ─── Consumer: Notification Service ───
# Binds to order.* (all order events)
channel.queue_declare(queue='notification-service-queue', durable=True)
channel.queue_bind(
exchange='order-events',
queue='notification-service-queue',
routing_key='order.*' # Wildcard — all order events
)
# ─── Consumer: Analytics Service ───
# Binds to *.* (all events from all aggregates)
channel.queue_declare(queue='analytics-service-queue', durable=True)
channel.queue_bind(
exchange='order-events',
queue='analytics-service-queue',
routing_key='#' # Hash — match everything
)
AWS EventBridge
A serverless event bus. No infrastructure to manage. Provides content-based filtering with event patterns.
// AWS EventBridge — event pattern matching
// Event structure:
{
"source": "com.ecommerce.orders",
"detail-type": "OrderPlaced",
"detail": {
"orderId": "order-98765",
"customerId": "cust-11223",
"totalAmount": 172.97,
"customerTier": "premium",
"region": "us-west-2"
}
}
// Rule 1: Route premium orders > $100 to fraud detection
{
"source": ["com.ecommerce.orders"],
"detail-type": ["OrderPlaced"],
"detail": {
"customerTier": ["premium"],
"totalAmount": [{ "numeric": [">", 100] }]
}
}
// → Target: Lambda function for fraud analysis
// Rule 2: Route ALL order events to analytics
{
"source": [{ "prefix": "com.ecommerce" }],
"detail-type": [{ "prefix": "Order" }]
}
// → Target: Kinesis Firehose → S3 → Athena
| Feature | Kafka | RabbitMQ | EventBridge |
|---|---|---|---|
| Model | Distributed log | Message broker | Serverless bus |
| Throughput | Millions/sec | ~50K/sec | Varies (managed) |
| Retention | Days/weeks/forever | Until consumed | 24 hours retry |
| Ordering | Per-partition | Per-queue (with limits) | No guarantees |
| Replay | Yes (reset offset) | No (consumed = gone) | Archive to replay |
| Routing | By partition key | Complex (exchanges) | Content-based rules |
| Best for | Event streaming, log aggregation, high-throughput | Task queues, complex routing, RPC | Serverless, AWS-native, low-ops |
Event Schema Evolution
Events are contracts between services. As your system evolves, event schemas must change — but consumers can't all upgrade simultaneously. Schema evolution strategies prevent breaking changes from cascading through your architecture.
Apache Avro & Schema Registry
Avro is the de facto standard for Kafka event serialization. It provides compact binary encoding, schema evolution with compatibility rules, and a Schema Registry to enforce contracts.
// Avro Schema — OrderPlaced event (Version 1)
{
"type": "record",
"name": "OrderPlacedEvent",
"namespace": "com.ecommerce.orders.events",
"doc": "Published when a customer successfully places an order",
"fields": [
{
"name": "eventId",
"type": "string",
"doc": "Unique identifier for this event instance"
},
{
"name": "eventType",
"type": "string",
"default": "OrderPlaced"
},
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "orderId",
"type": "string"
},
{
"name": "customerId",
"type": "string"
},
{
"name": "items",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{"name": "productId", "type": "string"},
{"name": "sku", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "unitPrice", "type": "double"}
]
}
}
},
{
"name": "totalAmount",
"type": "double"
},
{
"name": "currency",
"type": "string",
"default": "USD"
}
]
}
// ─── Version 2: Add optional shippingAddress field ───
// BACKWARD COMPATIBLE — old consumers can read new events
// (they just ignore the new field)
{
"type": "record",
"name": "OrderPlacedEvent",
"namespace": "com.ecommerce.orders.events",
"fields": [
// ... all existing fields unchanged ...
{
"name": "shippingAddress",
"type": ["null", {
"type": "record",
"name": "Address",
"fields": [
{"name": "street", "type": "string"},
{"name": "city", "type": "string"},
{"name": "state", "type": "string"},
{"name": "zip", "type": "string"},
{"name": "country", "type": "string", "default": "US"}
]
}],
"default": null,
"doc": "Added in v2 — shipping destination"
},
{
"name": "customerTier",
"type": ["null", "string"],
"default": null,
"doc": "Added in v2 — standard, premium, enterprise"
}
]
}
# Schema Registry — enforcing compatibility
# Register schema for a topic's value:
curl -X POST http://schema-registry:8081/subjects/orders-topic-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schema": "{\"type\":\"record\",\"name\":\"OrderPlacedEvent\",...}",
"schemaType": "AVRO"
}'
# Set compatibility level:
curl -X PUT http://schema-registry:8081/config/orders-topic-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'
# Compatibility Levels:
# ┌──────────────────┬──────────────────────────────────────────────────┐
# │ Level │ Rule │
# ├──────────────────┼──────────────────────────────────────────────────┤
# │ BACKWARD │ New schema can read old data │
# │ │ → Add optional fields, remove fields with default│
# ├──────────────────┼──────────────────────────────────────────────────┤
# │ FORWARD │ Old schema can read new data │
# │ │ → Remove optional fields, add fields with default│
# ├──────────────────┼──────────────────────────────────────────────────┤
# │ FULL │ Both backward and forward compatible │
# │ │ → Only add/remove optional fields with defaults │
# ├──────────────────┼──────────────────────────────────────────────────┤
# │ NONE │ No compatibility checks (dangerous!) │
# └──────────────────┴──────────────────────────────────────────────────┘
# Test compatibility before deploying:
curl -X POST http://schema-registry:8081/compatibility/subjects/orders-topic-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{...new schema...}"}'
# Response: {"is_compatible": true}
Eventual Consistency in Event-Driven Systems
In an event-driven system, data is eventually consistent by design. When the Order Service publishes OrderPlaced, the Inventory Service might not process it for seconds or even minutes. During that window, the inventory shows outdated quantities.
// Timeline of eventual consistency:
//
// T+0ms: Order Service → writes order to DB, publishes OrderPlaced
// T+50ms: Event arrives at Kafka broker, replicated to followers
// T+120ms: Payment Service polls, picks up event, starts processing
// T+350ms: Payment Service authorizes → publishes PaymentAuthorized
// T+400ms: Inventory Service picks up OrderPlaced → reserves stock
// T+500ms: Notification Service picks up OrderPlaced → sends email
//
// For 400ms, Inventory Service shows stale data!
// For 500ms, customer hasn't received confirmation email!
//
// This is FINE for most use cases. But you must design for it:
// Strategy 1: Optimistic UI — show "processing" immediately
// Strategy 2: Polling / WebSocket — client polls for final status
// Strategy 3: Correlation ID — track the saga across services
// Anti-pattern: Synchronous read-after-write
// ❌ POST /orders → publish event → immediately GET /inventory
// (inventory hasn't processed the event yet!)
//
// ✅ POST /orders → return 202 Accepted with orderId
// Client polls GET /orders/{id}/status until "confirmed"
- Return 202 Accepted instead of 200 OK for async operations
- Include a status endpoint clients can poll
- Use correlation IDs to trace events across services
- Idempotent operations — safe to retry if events are redelivered
- Compensating transactions — if a downstream step fails, publish a compensating event
Event-Driven Flow in Action
The following animation demonstrates how an OrderPlaced event flows from the Order Service through an event bus to multiple independent consumers — each processing the event at their own pace.
▶ Event-Driven Flow
Watch how one event triggers parallel, independent processing across multiple services.
Dead Letter Queues (DLQ)
When an event handler fails repeatedly, you don't want it to block the entire queue forever. A Dead Letter Queue is a separate topic/queue where failed events are sent after exhausting retries. Engineers can inspect, fix, and replay them.
// Kafka DLQ Configuration — Spring Boot
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setCommonErrorHandler(errorHandler());
return factory;
}
@Bean
public DefaultErrorHandler errorHandler() {
// Retry 3 times with exponential backoff, then send to DLQ
var backoff = new ExponentialBackOff(1000L, 2.0); // 1s, 2s, 4s
backoff.setMaxElapsedTime(10000L); // Max 10s total
var recoverer = new DeadLetterPublishingRecoverer(
kafkaTemplate,
// Route to DLQ topic: orders-topic.DLT
(record, ex) -> new TopicPartition(
record.topic() + ".DLT",
record.partition()
)
);
var handler = new DefaultErrorHandler(recoverer, backoff);
// Don't retry on these — they'll never succeed
handler.addNotRetryableExceptions(
DeserializationException.class,
ValidationException.class,
NullPointerException.class
);
return handler;
}
}
// DLQ Monitor — alerting on failed events
@KafkaListener(topics = "orders-topic.DLT", groupId = "dlq-monitor")
public void handleDeadLetter(
ConsumerRecord<String, byte[]> record,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String errorMsg,
@Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
@Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset
) {
log.error("DLQ event from {} offset {}: {}",
originalTopic, originalOffset, errorMsg);
// Alert ops team
alertService.sendSlackAlert(
"#order-alerts",
"Failed event in DLQ: " + errorMsg +
"\nOriginal topic: " + originalTopic +
"\nOffset: " + originalOffset
);
// Store for later replay
dlqRepository.save(DeadLetterRecord.builder()
.originalTopic(originalTopic)
.originalOffset(originalOffset)
.errorMessage(errorMsg)
.rawPayload(record.value())
.createdAt(Instant.now())
.status("PENDING_REVIEW")
.build());
}
Idempotent Event Handlers
In distributed systems, events can be delivered more than once — network retries, consumer rebalancing, at-least-once delivery semantics. Every event handler must be idempotent: processing the same event twice produces the same result as processing it once.
// ❌ Non-idempotent handler — processes payment twice!
async function handleOrderPlaced(event) {
await chargeCustomer(event.data.customerId, event.data.totalAmount);
// If this event is redelivered, customer is charged TWICE
}
// ✅ Idempotent handler — uses event ID for deduplication
async function handleOrderPlaced(event) {
// Check if we've already processed this exact event
const existing = await db.query(
'SELECT 1 FROM processed_events WHERE event_id = $1',
[event.eventId]
);
if (existing.rows.length > 0) {
console.log(`Event ${event.eventId} already processed, skipping`);
return; // Idempotent — safe to skip
}
// Process within a transaction
await db.transaction(async (tx) => {
// Record that we've seen this event
await tx.query(
'INSERT INTO processed_events (event_id, processed_at) VALUES ($1, NOW())',
[event.eventId]
);
// Do the actual work
await tx.query(
'INSERT INTO payments (order_id, amount, status) VALUES ($1, $2, $3)',
[event.aggregateId, event.data.totalAmount, 'authorized']
);
});
}
// Alternative: Use natural idempotency keys
// Instead of a separate dedup table, use unique constraints:
async function handleInventoryReserved(event) {
try {
await db.query(
`INSERT INTO reservations (order_id, sku, quantity, reserved_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (order_id, sku) DO NOTHING`, // Idempotent!
[event.aggregateId, event.data.sku, event.data.quantity]
);
} catch (err) {
if (err.code === '23505') return; // Unique violation = already processed
throw err;
}
}
// Processed events table with TTL for cleanup:
CREATE TABLE processed_events (
event_id VARCHAR(64) PRIMARY KEY,
processed_at TIMESTAMP NOT NULL DEFAULT NOW()
);
-- Cleanup events older than 7 days (they won't be redelivered)
-- Run as a cron job:
DELETE FROM processed_events WHERE processed_at < NOW() - INTERVAL '7 days';
Event Ordering Challenges
Global ordering across all events is impossible at scale (it would require a single partition — no parallelism). Instead, event-driven systems provide partial ordering within a partition.
// Kafka ordering guarantees:
// ✅ Within a partition: Events are strictly ordered
// ❌ Across partitions: No ordering guarantee
// Problem: Two events for the same order land on different partitions
// → OrderPlaced and OrderCancelled arrive out of order!
// Solution: Use the aggregate ID as the partition key
// All events for order-98765 go to the SAME partition
producer.send(new ProducerRecord<>(
"orders-topic",
order.getId(), // KEY: order-98765 → always same partition
orderEvent // VALUE: the event
));
// Partition assignment: hash(key) % numPartitions
// order-98765 → hash("order-98765") % 12 = partition 7
// All events for this order: partition 7, in order
// ───── Ordering across different aggregates ─────
// What if you need: "OrderPlaced THEN PaymentAuthorized"?
// These are from DIFFERENT services producing to DIFFERENT topics.
// NO broker guarantees this ordering.
// Solution: Causal ordering via event metadata
{
"eventId": "evt-pay-001",
"eventType": "PaymentAuthorized",
"causationId": "evt-ord-001", // The event that caused this
"correlationId": "saga-98765", // Groups all events in a saga
"timestamp": "2026-04-15T10:30:05Z"
}
// Consumer can rebuild causal order:
// 1. Buffer out-of-order events
// 2. Process only when causation event has been seen
// 3. Use a saga orchestrator to enforce ordering
The Outbox Pattern
The most dangerous problem in event-driven systems: dual writes. When a service writes to its database AND publishes an event, either can fail independently, leaving the system in an inconsistent state.
// ❌ The dual-write problem:
async function placeOrder(order) {
await db.insert('orders', order); // Step 1: DB write succeeds
await kafka.publish('OrderPlaced', order); // Step 2: Kafka publish FAILS!
// DB has the order, but no event was published
// Other services never learn about this order!
}
// Even reversing the order doesn't help:
async function placeOrder(order) {
await kafka.publish('OrderPlaced', order); // Step 1: Kafka publish succeeds
await db.insert('orders', order); // Step 2: DB write FAILS!
// Event was published, but order doesn't exist in DB
// Other services act on a phantom order!
}
// ✅ The Outbox Pattern — single atomic write
async function placeOrder(order) {
await db.transaction(async (tx) => {
// Write the order
await tx.insert('orders', order);
// Write the event to an outbox table IN THE SAME TRANSACTION
await tx.insert('outbox_events', {
id: uuid(),
aggregate_type: 'Order',
aggregate_id: order.id,
event_type: 'OrderPlaced',
payload: JSON.stringify({
orderId: order.id,
customerId: order.customerId,
items: order.items,
totalAmount: order.totalAmount
}),
created_at: new Date(),
published: false
});
});
// If TX commits → both order AND event are saved atomically
// If TX fails → neither is saved
}
-- Outbox table schema
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
published BOOLEAN NOT NULL DEFAULT FALSE,
published_at TIMESTAMP,
retry_count INT NOT NULL DEFAULT 0
);
CREATE INDEX idx_outbox_unpublished
ON outbox_events (created_at)
WHERE published = FALSE;
-- Polling-based relay (simple approach):
-- A background worker polls the outbox and publishes to Kafka
-- 1. Fetch unpublished events (batch of 100)
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE published = FALSE
ORDER BY created_at
LIMIT 100
FOR UPDATE SKIP LOCKED; -- Skip rows being processed by other workers
-- 2. Publish each to Kafka
-- 3. Mark as published
UPDATE outbox_events
SET published = TRUE, published_at = NOW()
WHERE id IN (...published ids...);
-- 4. Cleanup old published events (keep 7 days for debugging)
DELETE FROM outbox_events
WHERE published = TRUE AND published_at < NOW() - INTERVAL '7 days';
▶ Outbox Pattern
See how the outbox pattern guarantees reliable event publishing by using a single database transaction.
Change Data Capture (CDC) with Debezium
Instead of polling the outbox table, use Change Data Capture to stream database changes directly to Kafka. Debezium reads the database's transaction log (WAL in PostgreSQL, binlog in MySQL) and publishes change events — no polling, no missed events, near real-time.
// Debezium Connector Configuration — PostgreSQL → Kafka
// Register via Kafka Connect REST API:
POST http://kafka-connect:8083/connectors
{
"name": "order-service-outbox-connector",
"config": {
// Connector class
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
// Database connection
"database.hostname": "order-db.internal",
"database.port": "5432",
"database.user": "debezium_user",
"database.password": "${secrets.DB_PASSWORD}",
"database.dbname": "order_service",
"database.server.name": "order-db",
// WAL (Write-Ahead Log) configuration
"plugin.name": "pgoutput",
"slot.name": "debezium_order_slot",
"publication.name": "dbz_publication",
// Capture only the outbox table
"table.include.list": "public.outbox_events",
// Outbox Event Router — transforms outbox rows into proper events
"transforms": "outbox",
"transforms.outbox.type":
"io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.table.fields.additional.placement":
"aggregate_type:header:aggregateType",
"transforms.outbox.route.topic.replacement": "${routedByValue}.events",
// Route to topic based on aggregate_type:
// aggregate_type = "Order" → topic = "Order.events"
// aggregate_type = "Payment" → topic = "Payment.events"
"transforms.outbox.route.by.field": "aggregate_type",
// Delete outbox rows after capture (keep DB clean)
"transforms.outbox.table.expand.json.payload": true
}
}
// ─── How it works under the hood ───
//
// 1. Order Service writes to orders + outbox_events in one TX
// ┌─────────────────────────────────────┐
// │ BEGIN TRANSACTION │
// │ INSERT INTO orders (...) │
// │ INSERT INTO outbox_events (...) │
// │ COMMIT │
// └───────────┬─────────────────────────┘
// │
// 2. PostgreSQL writes to WAL (Write-Ahead Log)
// │
// ┌───────────▼─────────────────────────┐
// │ WAL: INSERT outbox_events │
// │ id=evt-001, type=OrderPlaced │
// │ aggregate_id=order-98765 │
// │ payload={...} │
// └───────────┬─────────────────────────┘
// │
// 3. Debezium reads WAL via logical replication
// │
// ┌───────────▼─────────────────────────┐
// │ Debezium EventRouter transforms │
// │ outbox row → Kafka event │
// │ Topic: Order.events │
// │ Key: order-98765 │
// │ Value: {eventType: OrderPlaced,...} │
// └───────────┬─────────────────────────┘
// │
// 4. Published to Kafka topic
// ▼
// ┌─────────────────────────────────────┐
// │ Kafka: Order.events │
// │ Partition 7 (by key order-98765) │
// │ Consumers: payment, inventory, etc. │
// └─────────────────────────────────────┘
# Debezium connector status check:
curl http://kafka-connect:8083/connectors/order-service-outbox-connector/status
{
"name": "order-service-outbox-connector",
"connector": { "state": "RUNNING", "worker_id": "connect-1:8083" },
"tasks": [
{ "id": 0, "state": "RUNNING", "worker_id": "connect-1:8083" }
]
}
# Monitor CDC lag — ensure Debezium keeps up with DB writes:
# Metric: debezium_postgres_streaming_lag_in_bytes
# Alert if lag > 10MB for > 5 minutes
# PostgreSQL replication slot monitoring:
SELECT slot_name, active, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)
AS lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'debezium_order_slot';
# ┌─────────────────────┬────────┬───────────┐
# │ slot_name │ active │ lag_bytes │
# ├─────────────────────┼────────┼───────────┤
# │ debezium_order_slot │ t │ 1024 │ ← Healthy: ~1KB lag
# └─────────────────────┴────────┴───────────┘
| Aspect | Polling | CDC (Debezium) |
|---|---|---|
| Latency | Poll interval (1–5s) | Near real-time (~100ms) |
| DB load | Constant queries | Reads WAL (minimal) |
| Complexity | Simple to implement | Requires Kafka Connect infra |
| Reliability | Can miss events on crash | WAL-based, exactly-once |
Real-World: E-Commerce Order Processing Pipeline
Let's put it all together. Here's a complete event-driven order processing pipeline — from the moment a customer clicks "Place Order" to delivery confirmation.
// ═══════════════════════════════════════════════════════════════════
// E-Commerce Order Processing Pipeline — Full Event Flow
// ═══════════════════════════════════════════════════════════════════
// ─── Step 1: Order Service — Accepts the order ───
// API Gateway → Order Service
@PostMapping("/api/orders")
@Transactional
public ResponseEntity<OrderResponse> placeOrder(@RequestBody CreateOrderRequest req) {
// Validate request
var customer = customerClient.getCustomer(req.getCustomerId());
if (customer == null) throw new CustomerNotFoundException(req.getCustomerId());
// Create order in DB
var order = Order.builder()
.id(OrderId.generate())
.customerId(req.getCustomerId())
.items(req.getItems().stream()
.map(i -> OrderItem.of(i.getProductId(), i.getQuantity(), i.getPrice()))
.toList())
.status(OrderStatus.PENDING)
.totalAmount(calculateTotal(req.getItems()))
.createdAt(Instant.now())
.build();
orderRepository.save(order);
// Write to outbox (same transaction!)
outboxRepository.save(OutboxEvent.builder()
.aggregateType("Order")
.aggregateId(order.getId().toString())
.eventType("OrderPlaced")
.payload(objectMapper.writeValueAsString(OrderPlacedPayload.from(order)))
.build());
// Return 202 Accepted — processing is async from here
return ResponseEntity.accepted()
.body(new OrderResponse(order.getId(), "PENDING",
"Order received, processing..."));
}
// Debezium CDC captures outbox insert → publishes to Order.events topic
// ─── Step 2: Payment Service — Authorizes payment ───
// Consumes: Order.events (OrderPlaced)
// Produces: Payment.events (PaymentAuthorized / PaymentFailed)
@KafkaListener(topics = "Order.events", groupId = "payment-service")
public void handleOrderEvent(OrderEvent event) {
if (!"OrderPlaced".equals(event.getEventType())) return;
// Idempotency check
if (processedEventRepo.existsById(event.getEventId())) {
log.info("Event {} already processed", event.getEventId());
return;
}
var payload = event.getPayloadAs(OrderPlacedPayload.class);
try {
// Call payment gateway (Stripe, etc.)
var charge = stripeClient.createCharge(
payload.getCustomerId(),
payload.getTotalAmount(),
payload.getCurrency(),
Map.of("orderId", payload.getOrderId())
);
// Save payment record + outbox event atomically
transactionTemplate.execute(status -> {
paymentRepository.save(Payment.builder()
.orderId(payload.getOrderId())
.chargeId(charge.getId())
.amount(payload.getTotalAmount())
.status(PaymentStatus.AUTHORIZED)
.build());
outboxRepository.save(OutboxEvent.builder()
.aggregateType("Payment")
.aggregateId(payload.getOrderId())
.eventType("PaymentAuthorized")
.payload(toJson(new PaymentAuthorizedPayload(
payload.getOrderId(), charge.getId(),
payload.getTotalAmount())))
.build());
processedEventRepo.save(new ProcessedEvent(event.getEventId()));
return null;
});
} catch (PaymentDeclinedException e) {
// Publish failure event
outboxRepository.save(OutboxEvent.builder()
.aggregateType("Payment")
.aggregateId(payload.getOrderId())
.eventType("PaymentFailed")
.payload(toJson(new PaymentFailedPayload(
payload.getOrderId(), e.getDeclineReason())))
.build());
}
}
// ─── Step 3: Inventory Service — Reserves stock ───
// Consumes: Order.events (OrderPlaced)
// Produces: Inventory.events (InventoryReserved / InsufficientStock)
@KafkaListener(topics = "Order.events", groupId = "inventory-service")
public void handleOrderPlaced(OrderEvent event) {
if (!"OrderPlaced".equals(event.getEventType())) return;
var payload = event.getPayloadAs(OrderPlacedPayload.class);
for (var item : payload.getItems()) {
int updated = jdbcTemplate.update(
"""
UPDATE inventory
SET reserved_qty = reserved_qty + ?,
available_qty = available_qty - ?
WHERE sku = ? AND available_qty >= ?
""",
item.getQuantity(), item.getQuantity(),
item.getSku(), item.getQuantity()
);
if (updated == 0) {
// Insufficient stock — publish compensating event
publishEvent("InsufficientStock", payload.getOrderId(),
new InsufficientStockPayload(item.getSku(), item.getQuantity()));
return;
}
}
publishEvent("InventoryReserved", payload.getOrderId(),
new InventoryReservedPayload(payload.getOrderId(), payload.getItems()));
}
// ─── Step 4: Notification Service — Sends confirmation ───
// Consumes: Order.events, Payment.events, Shipping.events
@KafkaListener(
topics = {"Order.events", "Payment.events", "Shipping.events"},
groupId = "notification-service"
)
public void handleEvent(GenericEvent event) {
switch (event.getEventType()) {
case "OrderPlaced" -> sendEmail(
event.getData("customerEmail"),
"Order Confirmed",
"orderConfirmation",
Map.of("orderId", event.getAggregateId(),
"items", event.getData("items"))
);
case "PaymentAuthorized" -> sendEmail(
lookupEmail(event.getAggregateId()),
"Payment Received",
"paymentReceipt",
Map.of("amount", event.getData("amount"),
"chargeId", event.getData("chargeId"))
);
case "OrderShipped" -> sendEmailAndSms(
lookupContact(event.getAggregateId()),
"Your Order Has Shipped!",
"shippingNotification",
Map.of("trackingId", event.getData("trackingId"),
"carrier", event.getData("carrier"),
"estimatedDelivery", event.getData("eta"))
);
}
}
// ─── Complete Event Flow Timeline ───
//
// T+0ms Customer clicks "Place Order"
// │
// T+50ms Order Service: validate → save order + outbox → return 202
// │
// T+150ms Debezium CDC: reads WAL → publishes OrderPlaced to Kafka
// │
// ├──→ Payment Service (group: payment-service)
// │ T+300ms Calls Stripe → PaymentAuthorized
// │
// ├──→ Inventory Service (group: inventory-service)
// │ T+250ms Reserves stock → InventoryReserved
// │
// └──→ Notification Service (group: notification-service)
// T+400ms Sends order confirmation email
//
// T+500ms Payment.events: PaymentAuthorized
// │
// ├──→ Order Service: updates order status to PAID
// └──→ Notification Service: sends payment receipt
//
// T+600ms Inventory.events: InventoryReserved
// │
// └──→ Shipping Service: creates shipment
// T+800ms Carrier API → OrderShipped
//
// T+900ms Shipping.events: OrderShipped
// │
// ├──→ Order Service: updates status to SHIPPED
// └──→ Notification Service: sends tracking email + SMS
//
// ═══════════════════════════════════════════════════════════════
// Total: ~900ms from click to all services updated
// Each service processed independently, no blocking calls
// If Payment Service was slow (2s), Inventory and Notification
// still processed at T+250ms and T+400ms respectively
// ═══════════════════════════════════════════════════════════════
Key Takeaways
- Event notification is simple but creates runtime coupling (consumer must call back). Event-carried state transfer gives full autonomy at the cost of larger events and data duplication.
- Event sourcing stores events as the source of truth — powerful for audit trails and time-travel debugging, but adds complexity for queries (use CQRS projections).
- Kafka is ideal for high-throughput event streaming with ordering guarantees per partition. RabbitMQ excels at complex routing and task queues. EventBridge is the serverless option for AWS-native architectures.
- Schema evolution with Avro + Schema Registry prevents breaking changes from cascading across services. Always use BACKWARD or FULL compatibility.
- The Outbox Pattern solves the dual-write problem — write data and events in a single database transaction. Pair with CDC (Debezium) for near real-time, reliable event publishing.
- Idempotent handlers are non-negotiable — events will be delivered more than once. Use event ID deduplication or natural idempotency keys.
- Dead Letter Queues prevent poison messages from blocking your pipeline. Monitor, alert, and replay.
- Event ordering is guaranteed only within a Kafka partition. Use aggregate IDs as partition keys and causal ordering metadata for cross-service flows.