← All Posts
High Level Design Series · Architecture Patterns · Part 2· Post 38 of 70

Event-Driven Architecture

In traditional request-driven architectures, services communicate through synchronous calls — Service A calls Service B, waits for the response, then proceeds. This creates temporal coupling (both must be available simultaneously), behavioral coupling (caller knows the callee's API), and cascading failures (if B is down, A fails too).

Event-driven architecture (EDA) flips this model. Instead of telling other services what to do, a service announces what happened. Interested services react independently. The producer doesn't know — or care — who consumes the event. This fundamental inversion enables loose coupling, independent scalability, and resilient systems that degrade gracefully.

Event vs. Command vs. Query:
  • Event — something that happened (past tense): OrderPlaced, PaymentReceived, InventoryReserved
  • Command — a request to do something (imperative): PlaceOrder, ChargePayment, ReserveInventory
  • Query — a request to read something: GetOrderStatus, ListProducts
Events are facts. They are immutable, past-tense, and cannot be rejected. Commands can fail; events have already happened.

Three Event Patterns

Martin Fowler identifies three distinct patterns for using events in distributed systems. Each has different trade-offs around coupling, latency, and data freshness.

1. Event Notification

The simplest pattern: the event is a thin notification that something happened. It carries minimal data — just enough for the consumer to decide whether to act, then fetch details from the source.

// Event Notification — thin payload
{
  "eventType": "OrderPlaced",
  "eventId": "evt-a1b2c3d4",
  "timestamp": "2026-04-15T10:30:00Z",
  "aggregateId": "order-98765",
  "aggregateType": "Order"
  // No order details — consumer must call Order Service API
}

// Consumer logic:
async function handleOrderPlaced(event) {
  // Must call back to the source to get full data
  const order = await orderServiceClient.getOrder(event.aggregateId);
  await inventoryService.reserve(order.items);
}
ProsCons
Low coupling — event schema is tinyRuntime coupling — consumer must call the source
Easy to evolve — add new consumers without changing the eventHigher latency — extra network round-trip
Small event sizeSource must handle increased read load from all consumers

2. Event-Carried State Transfer (ECST)

The event carries all the data the consumer needs. No callback to the source required. Each consumer maintains its own local copy of the data it needs, updated by processing events.

// Event-Carried State Transfer — fat payload
{
  "eventType": "OrderPlaced",
  "eventId": "evt-a1b2c3d4",
  "timestamp": "2026-04-15T10:30:00Z",
  "aggregateId": "order-98765",
  "data": {
    "customerId": "cust-11223",
    "customerEmail": "alice@example.com",
    "customerTier": "premium",
    "items": [
      {
        "productId": "prod-555",
        "productName": "Wireless Headphones",
        "sku": "WH-PRO-BLK",
        "quantity": 2,
        "unitPrice": 79.99
      },
      {
        "productId": "prod-888",
        "productName": "USB-C Cable",
        "sku": "USB-C-2M",
        "quantity": 1,
        "unitPrice": 12.99
      }
    ],
    "shippingAddress": {
      "street": "123 Main St",
      "city": "Seattle",
      "state": "WA",
      "zip": "98101"
    },
    "totalAmount": 172.97,
    "currency": "USD",
    "paymentMethod": "credit_card"
  }
}

// Consumer logic — completely self-contained:
async function handleOrderPlaced(event) {
  // No API calls needed — all data is in the event
  const { items, totalAmount, customerId } = event.data;
  for (const item of items) {
    await localInventoryDb.decrement(item.sku, item.quantity);
  }
  await localOrderDb.insert({
    orderId: event.aggregateId,
    customerId,
    total: totalAmount,
    status: 'pending_payment'
  });
}
ProsCons
Full autonomy — consumer never calls backLarger events — more bandwidth
Lower latency — no extra round-tripsData duplication across consumers
Resilient — works even if source is downEventual consistency — stale local copies

3. Event Sourcing

Instead of storing current state, store the sequence of events that produced it. The event log is the source of truth. Current state is derived by replaying events.

// Event Sourcing — the event log IS the database

// Event Store for Order aggregate (order-98765):
┌─────┬───────────────────┬────────────────────────────────────┬──────────┐
│ Seq │ Event Type        │ Data                               │ Timestamp│
├─────┼───────────────────┼────────────────────────────────────┼──────────┤
│  1  │ OrderCreated      │ {customerId:"cust-11223",items:[…]}│ 10:30:00 │
│  2  │ PaymentAuthorized │ {paymentId:"pay-xyz",amount:172.97}│ 10:30:05 │
│  3  │ InventoryReserved │ {warehouse:"SEA-1",items:[…]}      │ 10:30:08 │
│  4  │ OrderShipped       │ {trackingId:"1Z999...",carrier:UPS}│ 10:45:00 │
│  5  │ OrderDelivered    │ {signature:"Alice",deliveredAt:...}│ 14:20:00 │
└─────┴───────────────────┴────────────────────────────────────┴──────────┘

// Rebuild current state by replaying events:
function rebuildOrder(events) {
  let state = { status: 'unknown', items: [], payments: [] };

  for (const event of events) {
    switch (event.type) {
      case 'OrderCreated':
        state = {
          ...state,
          orderId: event.aggregateId,
          customerId: event.data.customerId,
          items: event.data.items,
          status: 'created'
        };
        break;
      case 'PaymentAuthorized':
        state = {
          ...state,
          payments: [...state.payments, event.data],
          status: 'paid'
        };
        break;
      case 'InventoryReserved':
        state = { ...state, warehouse: event.data.warehouse, status: 'reserved' };
        break;
      case 'OrderShipped':
        state = { ...state, trackingId: event.data.trackingId, status: 'shipped' };
        break;
      case 'OrderDelivered':
        state = { ...state, status: 'delivered', deliveredAt: event.data.deliveredAt };
        break;
    }
  }
  return state;
}

// Snapshots for performance (don't replay all events every time):
// Every 100 events, save a snapshot. Replay from last snapshot.
┌──────────────┬───────────────────────────┐
│ Snapshot #5  │ State at event seq 500    │
├──────────────┤ {status:'shipped',...}    │
│ + Events     │ seq 501, 502, 503...     │
│ = Current    │ Replay only 3 events     │
└──────────────┴───────────────────────────┘
Why Event Sourcing?
  • Complete audit trail — every state change is recorded forever
  • Time travel — rebuild state at any point in time
  • Debug production issues — replay events to reproduce bugs exactly
  • New projections — add a new read model by replaying the entire event log
  • Regulatory compliance — financial systems, healthcare records
Trade-offs: Eventual consistency, complex queries (need projections/CQRS), event schema evolution, learning curve.

Event Brokers & Buses

An event broker is the infrastructure that routes events from producers to consumers. The choice of broker fundamentally shapes your architecture's capabilities.

Apache Kafka

The dominant choice for high-throughput, durable event streaming. Kafka is a distributed commit log — events are appended to partitioned topics and retained for a configurable duration (or forever with compaction).

// Kafka Topic Configuration for Order Events
// orders-topic: 12 partitions, replication factor 3

// Producer — Java (Spring Boot)
@Service
public class OrderEventProducer {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public CompletableFuture<SendResult> publishOrderPlaced(Order order) {
        OrderEvent event = OrderEvent.builder()
            .eventId(UUID.randomUUID().toString())
            .eventType("OrderPlaced")
            .aggregateId(order.getId())
            .timestamp(Instant.now())
            .data(OrderPlacedData.builder()
                .customerId(order.getCustomerId())
                .items(order.getItems())
                .totalAmount(order.getTotalAmount())
                .shippingAddress(order.getShippingAddress())
                .build())
            .build();

        // Key = orderId → ensures all events for same order
        // go to same partition → preserves ordering per order
        return kafkaTemplate.send(
            "orders-topic",
            order.getId(),      // partition key
            event               // value (serialized with Avro)
        );
    }
}

// Consumer — Java (Spring Boot)
@Service
public class PaymentEventConsumer {

    @KafkaListener(
        topics = "orders-topic",
        groupId = "payment-service",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleOrderPlaced(
        @Payload OrderEvent event,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.OFFSET) long offset,
        Acknowledgment ack
    ) {
        if (!"OrderPlaced".equals(event.getEventType())) return;

        try {
            paymentService.authorizePayment(
                event.getAggregateId(),
                event.getData().getTotalAmount(),
                event.getData().getCustomerId()
            );
            ack.acknowledge();  // Manual commit after success
        } catch (RetryableException e) {
            // Don't ack — will be redelivered
            throw e;
        }
    }
}
# Kafka Topic Configuration (CLI)
kafka-topics.sh --create \
  --topic orders-topic \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \       # 7 days retention
  --config cleanup.policy=delete \
  --config min.insync.replicas=2 \
  --config compression.type=lz4 \
  --bootstrap-server kafka:9092

# Consumer Group Configuration
kafka-consumer-groups.sh --describe \
  --group payment-service \
  --bootstrap-server kafka:9092

# Output:
# TOPIC          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# orders-topic   0          15234           15240           6
# orders-topic   1          14890           14890           0
# orders-topic   2          15102           15105           3
# ...

# Key Kafka Concepts:
# ┌─────────────────────────────────────────────────────────────┐
# │ Topic: orders-topic (12 partitions)                        │
# │                                                             │
# │ Partition 0: [evt1] [evt4] [evt7] [evt10] → offset 15240  │
# │ Partition 1: [evt2] [evt5] [evt8] [evt11] → offset 14890  │
# │ Partition 2: [evt3] [evt6] [evt9] [evt12] → offset 15105  │
# │ ...                                                         │
# │                                                             │
# │ Consumer Group: payment-service (3 instances)              │
# │   Instance A → Partitions 0,1,2,3                          │
# │   Instance B → Partitions 4,5,6,7                          │
# │   Instance C → Partitions 8,9,10,11                        │
# │                                                             │
# │ Consumer Group: inventory-service (2 instances)            │
# │   Instance X → Partitions 0,1,2,3,4,5                      │
# │   Instance Y → Partitions 6,7,8,9,10,11                    │
# │                                                             │
# │ Each group gets ALL events. Within a group, each partition │
# │ is consumed by exactly ONE instance → parallel processing. │
# └─────────────────────────────────────────────────────────────┘

RabbitMQ

A traditional message broker with sophisticated routing. Uses exchanges, queues, and bindings. Better for complex routing patterns, lower latency per message, and task queues.

# RabbitMQ — Exchange/Queue topology for order events
# Python with pika library

import pika, json, uuid
from datetime import datetime

# Publisher
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
channel = connection.channel()

# Declare a topic exchange for order events
channel.exchange_declare(
    exchange='order-events',
    exchange_type='topic',
    durable=True
)

# Publish event with routing key
event = {
    'eventId': str(uuid.uuid4()),
    'eventType': 'OrderPlaced',
    'aggregateId': 'order-98765',
    'timestamp': datetime.utcnow().isoformat(),
    'data': {
        'customerId': 'cust-11223',
        'totalAmount': 172.97,
        'items': [{'sku': 'WH-PRO-BLK', 'qty': 2}]
    }
}

channel.basic_publish(
    exchange='order-events',
    routing_key='order.placed',       # Routing key pattern
    body=json.dumps(event),
    properties=pika.BasicProperties(
        delivery_mode=2,              # Persistent message
        content_type='application/json',
        message_id=event['eventId']
    )
)

# ─── Consumer: Payment Service ───
# Binds to order.placed and order.cancelled
channel.queue_declare(queue='payment-service-queue', durable=True)
channel.queue_bind(
    exchange='order-events',
    queue='payment-service-queue',
    routing_key='order.placed'        # Only order.placed events
)
channel.queue_bind(
    exchange='order-events',
    queue='payment-service-queue',
    routing_key='order.cancelled'     # Also order.cancelled events
)

# ─── Consumer: Notification Service ───
# Binds to order.* (all order events)
channel.queue_declare(queue='notification-service-queue', durable=True)
channel.queue_bind(
    exchange='order-events',
    queue='notification-service-queue',
    routing_key='order.*'             # Wildcard — all order events
)

# ─── Consumer: Analytics Service ───
# Binds to *.* (all events from all aggregates)
channel.queue_declare(queue='analytics-service-queue', durable=True)
channel.queue_bind(
    exchange='order-events',
    queue='analytics-service-queue',
    routing_key='#'                   # Hash — match everything
)

AWS EventBridge

A serverless event bus. No infrastructure to manage. Provides content-based filtering with event patterns.

// AWS EventBridge — event pattern matching
// Event structure:
{
  "source": "com.ecommerce.orders",
  "detail-type": "OrderPlaced",
  "detail": {
    "orderId": "order-98765",
    "customerId": "cust-11223",
    "totalAmount": 172.97,
    "customerTier": "premium",
    "region": "us-west-2"
  }
}

// Rule 1: Route premium orders > $100 to fraud detection
{
  "source": ["com.ecommerce.orders"],
  "detail-type": ["OrderPlaced"],
  "detail": {
    "customerTier": ["premium"],
    "totalAmount": [{ "numeric": [">", 100] }]
  }
}
// → Target: Lambda function for fraud analysis

// Rule 2: Route ALL order events to analytics
{
  "source": [{ "prefix": "com.ecommerce" }],
  "detail-type": [{ "prefix": "Order" }]
}
// → Target: Kinesis Firehose → S3 → Athena
FeatureKafkaRabbitMQEventBridge
ModelDistributed logMessage brokerServerless bus
ThroughputMillions/sec~50K/secVaries (managed)
RetentionDays/weeks/foreverUntil consumed24 hours retry
OrderingPer-partitionPer-queue (with limits)No guarantees
ReplayYes (reset offset)No (consumed = gone)Archive to replay
RoutingBy partition keyComplex (exchanges)Content-based rules
Best forEvent streaming, log aggregation, high-throughputTask queues, complex routing, RPCServerless, AWS-native, low-ops

Event Schema Evolution

Events are contracts between services. As your system evolves, event schemas must change — but consumers can't all upgrade simultaneously. Schema evolution strategies prevent breaking changes from cascading through your architecture.

Apache Avro & Schema Registry

Avro is the de facto standard for Kafka event serialization. It provides compact binary encoding, schema evolution with compatibility rules, and a Schema Registry to enforce contracts.

// Avro Schema — OrderPlaced event (Version 1)
{
  "type": "record",
  "name": "OrderPlacedEvent",
  "namespace": "com.ecommerce.orders.events",
  "doc": "Published when a customer successfully places an order",
  "fields": [
    {
      "name": "eventId",
      "type": "string",
      "doc": "Unique identifier for this event instance"
    },
    {
      "name": "eventType",
      "type": "string",
      "default": "OrderPlaced"
    },
    {
      "name": "timestamp",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "orderId",
      "type": "string"
    },
    {
      "name": "customerId",
      "type": "string"
    },
    {
      "name": "items",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "OrderItem",
          "fields": [
            {"name": "productId", "type": "string"},
            {"name": "sku", "type": "string"},
            {"name": "quantity", "type": "int"},
            {"name": "unitPrice", "type": "double"}
          ]
        }
      }
    },
    {
      "name": "totalAmount",
      "type": "double"
    },
    {
      "name": "currency",
      "type": "string",
      "default": "USD"
    }
  ]
}

// ─── Version 2: Add optional shippingAddress field ───
// BACKWARD COMPATIBLE — old consumers can read new events
// (they just ignore the new field)
{
  "type": "record",
  "name": "OrderPlacedEvent",
  "namespace": "com.ecommerce.orders.events",
  "fields": [
    // ... all existing fields unchanged ...
    {
      "name": "shippingAddress",
      "type": ["null", {
        "type": "record",
        "name": "Address",
        "fields": [
          {"name": "street", "type": "string"},
          {"name": "city", "type": "string"},
          {"name": "state", "type": "string"},
          {"name": "zip", "type": "string"},
          {"name": "country", "type": "string", "default": "US"}
        ]
      }],
      "default": null,
      "doc": "Added in v2 — shipping destination"
    },
    {
      "name": "customerTier",
      "type": ["null", "string"],
      "default": null,
      "doc": "Added in v2 — standard, premium, enterprise"
    }
  ]
}
# Schema Registry — enforcing compatibility
# Register schema for a topic's value:
curl -X POST http://schema-registry:8081/subjects/orders-topic-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schema": "{\"type\":\"record\",\"name\":\"OrderPlacedEvent\",...}",
    "schemaType": "AVRO"
  }'

# Set compatibility level:
curl -X PUT http://schema-registry:8081/config/orders-topic-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}'

# Compatibility Levels:
# ┌──────────────────┬──────────────────────────────────────────────────┐
# │ Level            │ Rule                                             │
# ├──────────────────┼──────────────────────────────────────────────────┤
# │ BACKWARD         │ New schema can read old data                     │
# │                  │ → Add optional fields, remove fields with default│
# ├──────────────────┼──────────────────────────────────────────────────┤
# │ FORWARD          │ Old schema can read new data                     │
# │                  │ → Remove optional fields, add fields with default│
# ├──────────────────┼──────────────────────────────────────────────────┤
# │ FULL             │ Both backward and forward compatible             │
# │                  │ → Only add/remove optional fields with defaults  │
# ├──────────────────┼──────────────────────────────────────────────────┤
# │ NONE             │ No compatibility checks (dangerous!)             │
# └──────────────────┴──────────────────────────────────────────────────┘

# Test compatibility before deploying:
curl -X POST http://schema-registry:8081/compatibility/subjects/orders-topic-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{...new schema...}"}'
# Response: {"is_compatible": true}

Eventual Consistency in Event-Driven Systems

In an event-driven system, data is eventually consistent by design. When the Order Service publishes OrderPlaced, the Inventory Service might not process it for seconds or even minutes. During that window, the inventory shows outdated quantities.

// Timeline of eventual consistency:
//
// T+0ms:    Order Service → writes order to DB, publishes OrderPlaced
// T+50ms:   Event arrives at Kafka broker, replicated to followers
// T+120ms:  Payment Service polls, picks up event, starts processing
// T+350ms:  Payment Service authorizes → publishes PaymentAuthorized
// T+400ms:  Inventory Service picks up OrderPlaced → reserves stock
// T+500ms:  Notification Service picks up OrderPlaced → sends email
//
// For 400ms, Inventory Service shows stale data!
// For 500ms, customer hasn't received confirmation email!
//
// This is FINE for most use cases. But you must design for it:

// Strategy 1: Optimistic UI — show "processing" immediately
// Strategy 2: Polling / WebSocket — client polls for final status
// Strategy 3: Correlation ID — track the saga across services

// Anti-pattern: Synchronous read-after-write
// ❌ POST /orders → publish event → immediately GET /inventory
//    (inventory hasn't processed the event yet!)
//
// ✅ POST /orders → return 202 Accepted with orderId
//    Client polls GET /orders/{id}/status until "confirmed"
Designing for eventual consistency:
  • Return 202 Accepted instead of 200 OK for async operations
  • Include a status endpoint clients can poll
  • Use correlation IDs to trace events across services
  • Idempotent operations — safe to retry if events are redelivered
  • Compensating transactions — if a downstream step fails, publish a compensating event

Event-Driven Flow in Action

The following animation demonstrates how an OrderPlaced event flows from the Order Service through an event bus to multiple independent consumers — each processing the event at their own pace.

▶ Event-Driven Flow

Watch how one event triggers parallel, independent processing across multiple services.

Dead Letter Queues (DLQ)

When an event handler fails repeatedly, you don't want it to block the entire queue forever. A Dead Letter Queue is a separate topic/queue where failed events are sent after exhausting retries. Engineers can inspect, fix, and replay them.

// Kafka DLQ Configuration — Spring Boot
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
           kafkaListenerContainerFactory() {

        var factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setCommonErrorHandler(errorHandler());
        return factory;
    }

    @Bean
    public DefaultErrorHandler errorHandler() {
        // Retry 3 times with exponential backoff, then send to DLQ
        var backoff = new ExponentialBackOff(1000L, 2.0);  // 1s, 2s, 4s
        backoff.setMaxElapsedTime(10000L);                  // Max 10s total

        var recoverer = new DeadLetterPublishingRecoverer(
            kafkaTemplate,
            // Route to DLQ topic: orders-topic.DLT
            (record, ex) -> new TopicPartition(
                record.topic() + ".DLT",
                record.partition()
            )
        );

        var handler = new DefaultErrorHandler(recoverer, backoff);

        // Don't retry on these — they'll never succeed
        handler.addNotRetryableExceptions(
            DeserializationException.class,
            ValidationException.class,
            NullPointerException.class
        );

        return handler;
    }
}

// DLQ Monitor — alerting on failed events
@KafkaListener(topics = "orders-topic.DLT", groupId = "dlq-monitor")
public void handleDeadLetter(
    ConsumerRecord<String, byte[]> record,
    @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String errorMsg,
    @Header(KafkaHeaders.DLT_ORIGINAL_TOPIC) String originalTopic,
    @Header(KafkaHeaders.DLT_ORIGINAL_OFFSET) long originalOffset
) {
    log.error("DLQ event from {} offset {}: {}",
        originalTopic, originalOffset, errorMsg);

    // Alert ops team
    alertService.sendSlackAlert(
        "#order-alerts",
        "Failed event in DLQ: " + errorMsg +
        "\nOriginal topic: " + originalTopic +
        "\nOffset: " + originalOffset
    );

    // Store for later replay
    dlqRepository.save(DeadLetterRecord.builder()
        .originalTopic(originalTopic)
        .originalOffset(originalOffset)
        .errorMessage(errorMsg)
        .rawPayload(record.value())
        .createdAt(Instant.now())
        .status("PENDING_REVIEW")
        .build());
}

Idempotent Event Handlers

In distributed systems, events can be delivered more than once — network retries, consumer rebalancing, at-least-once delivery semantics. Every event handler must be idempotent: processing the same event twice produces the same result as processing it once.

// ❌ Non-idempotent handler — processes payment twice!
async function handleOrderPlaced(event) {
  await chargeCustomer(event.data.customerId, event.data.totalAmount);
  // If this event is redelivered, customer is charged TWICE
}

// ✅ Idempotent handler — uses event ID for deduplication
async function handleOrderPlaced(event) {
  // Check if we've already processed this exact event
  const existing = await db.query(
    'SELECT 1 FROM processed_events WHERE event_id = $1',
    [event.eventId]
  );
  if (existing.rows.length > 0) {
    console.log(`Event ${event.eventId} already processed, skipping`);
    return; // Idempotent — safe to skip
  }

  // Process within a transaction
  await db.transaction(async (tx) => {
    // Record that we've seen this event
    await tx.query(
      'INSERT INTO processed_events (event_id, processed_at) VALUES ($1, NOW())',
      [event.eventId]
    );

    // Do the actual work
    await tx.query(
      'INSERT INTO payments (order_id, amount, status) VALUES ($1, $2, $3)',
      [event.aggregateId, event.data.totalAmount, 'authorized']
    );
  });
}

// Alternative: Use natural idempotency keys
// Instead of a separate dedup table, use unique constraints:
async function handleInventoryReserved(event) {
  try {
    await db.query(
      `INSERT INTO reservations (order_id, sku, quantity, reserved_at)
       VALUES ($1, $2, $3, NOW())
       ON CONFLICT (order_id, sku) DO NOTHING`,  // Idempotent!
      [event.aggregateId, event.data.sku, event.data.quantity]
    );
  } catch (err) {
    if (err.code === '23505') return; // Unique violation = already processed
    throw err;
  }
}

// Processed events table with TTL for cleanup:
CREATE TABLE processed_events (
    event_id    VARCHAR(64) PRIMARY KEY,
    processed_at TIMESTAMP NOT NULL DEFAULT NOW()
);
-- Cleanup events older than 7 days (they won't be redelivered)
-- Run as a cron job:
DELETE FROM processed_events WHERE processed_at < NOW() - INTERVAL '7 days';

Event Ordering Challenges

Global ordering across all events is impossible at scale (it would require a single partition — no parallelism). Instead, event-driven systems provide partial ordering within a partition.

// Kafka ordering guarantees:
// ✅ Within a partition: Events are strictly ordered
// ❌ Across partitions: No ordering guarantee

// Problem: Two events for the same order land on different partitions
// → OrderPlaced and OrderCancelled arrive out of order!

// Solution: Use the aggregate ID as the partition key
// All events for order-98765 go to the SAME partition

producer.send(new ProducerRecord<>(
    "orders-topic",
    order.getId(),     // KEY: order-98765 → always same partition
    orderEvent         // VALUE: the event
));

// Partition assignment: hash(key) % numPartitions
// order-98765 → hash("order-98765") % 12 = partition 7
// All events for this order: partition 7, in order

// ───── Ordering across different aggregates ─────
// What if you need: "OrderPlaced THEN PaymentAuthorized"?
// These are from DIFFERENT services producing to DIFFERENT topics.
// NO broker guarantees this ordering.

// Solution: Causal ordering via event metadata
{
  "eventId": "evt-pay-001",
  "eventType": "PaymentAuthorized",
  "causationId": "evt-ord-001",    // The event that caused this
  "correlationId": "saga-98765",    // Groups all events in a saga
  "timestamp": "2026-04-15T10:30:05Z"
}

// Consumer can rebuild causal order:
// 1. Buffer out-of-order events
// 2. Process only when causation event has been seen
// 3. Use a saga orchestrator to enforce ordering

The Outbox Pattern

The most dangerous problem in event-driven systems: dual writes. When a service writes to its database AND publishes an event, either can fail independently, leaving the system in an inconsistent state.

// ❌ The dual-write problem:
async function placeOrder(order) {
  await db.insert('orders', order);       // Step 1: DB write succeeds
  await kafka.publish('OrderPlaced', order); // Step 2: Kafka publish FAILS!
  // DB has the order, but no event was published
  // Other services never learn about this order!
}

// Even reversing the order doesn't help:
async function placeOrder(order) {
  await kafka.publish('OrderPlaced', order); // Step 1: Kafka publish succeeds
  await db.insert('orders', order);       // Step 2: DB write FAILS!
  // Event was published, but order doesn't exist in DB
  // Other services act on a phantom order!
}

// ✅ The Outbox Pattern — single atomic write
async function placeOrder(order) {
  await db.transaction(async (tx) => {
    // Write the order
    await tx.insert('orders', order);

    // Write the event to an outbox table IN THE SAME TRANSACTION
    await tx.insert('outbox_events', {
      id: uuid(),
      aggregate_type: 'Order',
      aggregate_id: order.id,
      event_type: 'OrderPlaced',
      payload: JSON.stringify({
        orderId: order.id,
        customerId: order.customerId,
        items: order.items,
        totalAmount: order.totalAmount
      }),
      created_at: new Date(),
      published: false
    });
  });
  // If TX commits → both order AND event are saved atomically
  // If TX fails → neither is saved
}
-- Outbox table schema
CREATE TABLE outbox_events (
    id             UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(100) NOT NULL,
    aggregate_id   VARCHAR(100) NOT NULL,
    event_type     VARCHAR(100) NOT NULL,
    payload        JSONB NOT NULL,
    created_at     TIMESTAMP NOT NULL DEFAULT NOW(),
    published      BOOLEAN NOT NULL DEFAULT FALSE,
    published_at   TIMESTAMP,
    retry_count    INT NOT NULL DEFAULT 0
);

CREATE INDEX idx_outbox_unpublished
    ON outbox_events (created_at)
    WHERE published = FALSE;

-- Polling-based relay (simple approach):
-- A background worker polls the outbox and publishes to Kafka

-- 1. Fetch unpublished events (batch of 100)
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE published = FALSE
ORDER BY created_at
LIMIT 100
FOR UPDATE SKIP LOCKED;  -- Skip rows being processed by other workers

-- 2. Publish each to Kafka

-- 3. Mark as published
UPDATE outbox_events
SET published = TRUE, published_at = NOW()
WHERE id IN (...published ids...);

-- 4. Cleanup old published events (keep 7 days for debugging)
DELETE FROM outbox_events
WHERE published = TRUE AND published_at < NOW() - INTERVAL '7 days';

▶ Outbox Pattern

See how the outbox pattern guarantees reliable event publishing by using a single database transaction.

Change Data Capture (CDC) with Debezium

Instead of polling the outbox table, use Change Data Capture to stream database changes directly to Kafka. Debezium reads the database's transaction log (WAL in PostgreSQL, binlog in MySQL) and publishes change events — no polling, no missed events, near real-time.

// Debezium Connector Configuration — PostgreSQL → Kafka
// Register via Kafka Connect REST API:

POST http://kafka-connect:8083/connectors
{
  "name": "order-service-outbox-connector",
  "config": {
    // Connector class
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",

    // Database connection
    "database.hostname": "order-db.internal",
    "database.port": "5432",
    "database.user": "debezium_user",
    "database.password": "${secrets.DB_PASSWORD}",
    "database.dbname": "order_service",
    "database.server.name": "order-db",

    // WAL (Write-Ahead Log) configuration
    "plugin.name": "pgoutput",
    "slot.name": "debezium_order_slot",
    "publication.name": "dbz_publication",

    // Capture only the outbox table
    "table.include.list": "public.outbox_events",

    // Outbox Event Router — transforms outbox rows into proper events
    "transforms": "outbox",
    "transforms.outbox.type":
        "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.table.fields.additional.placement":
        "aggregate_type:header:aggregateType",
    "transforms.outbox.route.topic.replacement": "${routedByValue}.events",

    // Route to topic based on aggregate_type:
    // aggregate_type = "Order" → topic = "Order.events"
    // aggregate_type = "Payment" → topic = "Payment.events"
    "transforms.outbox.route.by.field": "aggregate_type",

    // Delete outbox rows after capture (keep DB clean)
    "transforms.outbox.table.expand.json.payload": true
  }
}

// ─── How it works under the hood ───
//
// 1. Order Service writes to orders + outbox_events in one TX
//    ┌─────────────────────────────────────┐
//    │ BEGIN TRANSACTION                    │
//    │   INSERT INTO orders (...)           │
//    │   INSERT INTO outbox_events (...)    │
//    │ COMMIT                               │
//    └───────────┬─────────────────────────┘
//                │
// 2. PostgreSQL writes to WAL (Write-Ahead Log)
//                │
//    ┌───────────▼─────────────────────────┐
//    │ WAL: INSERT outbox_events           │
//    │   id=evt-001, type=OrderPlaced      │
//    │   aggregate_id=order-98765          │
//    │   payload={...}                     │
//    └───────────┬─────────────────────────┘
//                │
// 3. Debezium reads WAL via logical replication
//                │
//    ┌───────────▼─────────────────────────┐
//    │ Debezium EventRouter transforms     │
//    │ outbox row → Kafka event            │
//    │ Topic: Order.events                 │
//    │ Key: order-98765                    │
//    │ Value: {eventType: OrderPlaced,...}  │
//    └───────────┬─────────────────────────┘
//                │
// 4. Published to Kafka topic
//                ▼
//    ┌─────────────────────────────────────┐
//    │ Kafka: Order.events                 │
//    │ Partition 7 (by key order-98765)    │
//    │ Consumers: payment, inventory, etc. │
//    └─────────────────────────────────────┘
# Debezium connector status check:
curl http://kafka-connect:8083/connectors/order-service-outbox-connector/status
{
  "name": "order-service-outbox-connector",
  "connector": { "state": "RUNNING", "worker_id": "connect-1:8083" },
  "tasks": [
    { "id": 0, "state": "RUNNING", "worker_id": "connect-1:8083" }
  ]
}

# Monitor CDC lag — ensure Debezium keeps up with DB writes:
# Metric: debezium_postgres_streaming_lag_in_bytes
# Alert if lag > 10MB for > 5 minutes

# PostgreSQL replication slot monitoring:
SELECT slot_name, active, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)
    AS lag_bytes
FROM pg_replication_slots
WHERE slot_name = 'debezium_order_slot';

# ┌─────────────────────┬────────┬───────────┐
# │ slot_name           │ active │ lag_bytes  │
# ├─────────────────────┼────────┼───────────┤
# │ debezium_order_slot │ t      │ 1024      │  ← Healthy: ~1KB lag
# └─────────────────────┴────────┴───────────┘
Outbox + CDC vs. Polling:
AspectPollingCDC (Debezium)
LatencyPoll interval (1–5s)Near real-time (~100ms)
DB loadConstant queriesReads WAL (minimal)
ComplexitySimple to implementRequires Kafka Connect infra
ReliabilityCan miss events on crashWAL-based, exactly-once

Real-World: E-Commerce Order Processing Pipeline

Let's put it all together. Here's a complete event-driven order processing pipeline — from the moment a customer clicks "Place Order" to delivery confirmation.

// ═══════════════════════════════════════════════════════════════════
// E-Commerce Order Processing Pipeline — Full Event Flow
// ═══════════════════════════════════════════════════════════════════

// ─── Step 1: Order Service — Accepts the order ───
// API Gateway → Order Service

@PostMapping("/api/orders")
@Transactional
public ResponseEntity<OrderResponse> placeOrder(@RequestBody CreateOrderRequest req) {
    // Validate request
    var customer = customerClient.getCustomer(req.getCustomerId());
    if (customer == null) throw new CustomerNotFoundException(req.getCustomerId());

    // Create order in DB
    var order = Order.builder()
        .id(OrderId.generate())
        .customerId(req.getCustomerId())
        .items(req.getItems().stream()
            .map(i -> OrderItem.of(i.getProductId(), i.getQuantity(), i.getPrice()))
            .toList())
        .status(OrderStatus.PENDING)
        .totalAmount(calculateTotal(req.getItems()))
        .createdAt(Instant.now())
        .build();

    orderRepository.save(order);

    // Write to outbox (same transaction!)
    outboxRepository.save(OutboxEvent.builder()
        .aggregateType("Order")
        .aggregateId(order.getId().toString())
        .eventType("OrderPlaced")
        .payload(objectMapper.writeValueAsString(OrderPlacedPayload.from(order)))
        .build());

    // Return 202 Accepted — processing is async from here
    return ResponseEntity.accepted()
        .body(new OrderResponse(order.getId(), "PENDING",
            "Order received, processing..."));
}

// Debezium CDC captures outbox insert → publishes to Order.events topic
// ─── Step 2: Payment Service — Authorizes payment ───
// Consumes: Order.events (OrderPlaced)
// Produces: Payment.events (PaymentAuthorized / PaymentFailed)

@KafkaListener(topics = "Order.events", groupId = "payment-service")
public void handleOrderEvent(OrderEvent event) {
    if (!"OrderPlaced".equals(event.getEventType())) return;

    // Idempotency check
    if (processedEventRepo.existsById(event.getEventId())) {
        log.info("Event {} already processed", event.getEventId());
        return;
    }

    var payload = event.getPayloadAs(OrderPlacedPayload.class);

    try {
        // Call payment gateway (Stripe, etc.)
        var charge = stripeClient.createCharge(
            payload.getCustomerId(),
            payload.getTotalAmount(),
            payload.getCurrency(),
            Map.of("orderId", payload.getOrderId())
        );

        // Save payment record + outbox event atomically
        transactionTemplate.execute(status -> {
            paymentRepository.save(Payment.builder()
                .orderId(payload.getOrderId())
                .chargeId(charge.getId())
                .amount(payload.getTotalAmount())
                .status(PaymentStatus.AUTHORIZED)
                .build());

            outboxRepository.save(OutboxEvent.builder()
                .aggregateType("Payment")
                .aggregateId(payload.getOrderId())
                .eventType("PaymentAuthorized")
                .payload(toJson(new PaymentAuthorizedPayload(
                    payload.getOrderId(), charge.getId(),
                    payload.getTotalAmount())))
                .build());

            processedEventRepo.save(new ProcessedEvent(event.getEventId()));
            return null;
        });

    } catch (PaymentDeclinedException e) {
        // Publish failure event
        outboxRepository.save(OutboxEvent.builder()
            .aggregateType("Payment")
            .aggregateId(payload.getOrderId())
            .eventType("PaymentFailed")
            .payload(toJson(new PaymentFailedPayload(
                payload.getOrderId(), e.getDeclineReason())))
            .build());
    }
}
// ─── Step 3: Inventory Service — Reserves stock ───
// Consumes: Order.events (OrderPlaced)
// Produces: Inventory.events (InventoryReserved / InsufficientStock)

@KafkaListener(topics = "Order.events", groupId = "inventory-service")
public void handleOrderPlaced(OrderEvent event) {
    if (!"OrderPlaced".equals(event.getEventType())) return;

    var payload = event.getPayloadAs(OrderPlacedPayload.class);

    for (var item : payload.getItems()) {
        int updated = jdbcTemplate.update(
            """
            UPDATE inventory
            SET reserved_qty = reserved_qty + ?,
                available_qty = available_qty - ?
            WHERE sku = ? AND available_qty >= ?
            """,
            item.getQuantity(), item.getQuantity(),
            item.getSku(), item.getQuantity()
        );

        if (updated == 0) {
            // Insufficient stock — publish compensating event
            publishEvent("InsufficientStock", payload.getOrderId(),
                new InsufficientStockPayload(item.getSku(), item.getQuantity()));
            return;
        }
    }

    publishEvent("InventoryReserved", payload.getOrderId(),
        new InventoryReservedPayload(payload.getOrderId(), payload.getItems()));
}

// ─── Step 4: Notification Service — Sends confirmation ───
// Consumes: Order.events, Payment.events, Shipping.events

@KafkaListener(
    topics = {"Order.events", "Payment.events", "Shipping.events"},
    groupId = "notification-service"
)
public void handleEvent(GenericEvent event) {
    switch (event.getEventType()) {
        case "OrderPlaced" -> sendEmail(
            event.getData("customerEmail"),
            "Order Confirmed",
            "orderConfirmation",
            Map.of("orderId", event.getAggregateId(),
                   "items", event.getData("items"))
        );
        case "PaymentAuthorized" -> sendEmail(
            lookupEmail(event.getAggregateId()),
            "Payment Received",
            "paymentReceipt",
            Map.of("amount", event.getData("amount"),
                   "chargeId", event.getData("chargeId"))
        );
        case "OrderShipped" -> sendEmailAndSms(
            lookupContact(event.getAggregateId()),
            "Your Order Has Shipped!",
            "shippingNotification",
            Map.of("trackingId", event.getData("trackingId"),
                   "carrier", event.getData("carrier"),
                   "estimatedDelivery", event.getData("eta"))
        );
    }
}
// ─── Complete Event Flow Timeline ───
//
// T+0ms      Customer clicks "Place Order"
//            │
// T+50ms     Order Service: validate → save order + outbox → return 202
//            │
// T+150ms    Debezium CDC: reads WAL → publishes OrderPlaced to Kafka
//            │
//            ├──→ Payment Service (group: payment-service)
//            │    T+300ms  Calls Stripe → PaymentAuthorized
//            │
//            ├──→ Inventory Service (group: inventory-service)
//            │    T+250ms  Reserves stock → InventoryReserved
//            │
//            └──→ Notification Service (group: notification-service)
//                 T+400ms  Sends order confirmation email
//
// T+500ms    Payment.events: PaymentAuthorized
//            │
//            ├──→ Order Service: updates order status to PAID
//            └──→ Notification Service: sends payment receipt
//
// T+600ms    Inventory.events: InventoryReserved
//            │
//            └──→ Shipping Service: creates shipment
//                 T+800ms  Carrier API → OrderShipped
//
// T+900ms    Shipping.events: OrderShipped
//            │
//            ├──→ Order Service: updates status to SHIPPED
//            └──→ Notification Service: sends tracking email + SMS
//
// ═══════════════════════════════════════════════════════════════
// Total: ~900ms from click to all services updated
// Each service processed independently, no blocking calls
// If Payment Service was slow (2s), Inventory and Notification
// still processed at T+250ms and T+400ms respectively
// ═══════════════════════════════════════════════════════════════

Key Takeaways