← All Posts
High Level Design Series · Architecture Patterns · Part 6· Post 42 of 70

Data Pipelines & Stream Processing

Batch vs Stream Processing

Data processing falls into two fundamental paradigms. The choice between them dictates your architecture, latency budget, infrastructure cost, and operational complexity.

DimensionBatch ProcessingStream Processing
Data modelBounded datasets (files, tables)Unbounded event streams
LatencyMinutes to hoursMilliseconds to seconds
ThroughputVery high (optimized for bulk)High (per-event overhead)
State managementImplicit (full dataset available)Explicit (state stores, checkpoints)
Fault toleranceRestart the jobCheckpoints, exactly-once guarantees
SchedulingCron, Airflow, DagsterAlways-on, event-driven
Cost modelPay per run (can use spot instances)Continuous resource allocation
ExamplesSpark batch, Hive, MapReduce, dbtFlink, Kafka Streams, Spark Streaming
When to choose which?
  • Batch — nightly analytics, ML model training, data warehouse loading, historical reprocessing
  • Stream — fraud detection, real-time dashboards, event-driven microservices, anomaly detection
  • Micro-batch — compromise (Spark Structured Streaming): near-real-time with batch-like simplicity

Time Semantics: The Core Distinction

In streaming, when an event happened and when it arrives are different things:

Event Time:      The timestamp embedded in the event (when it actually occurred)
Processing Time: The wall-clock time when the system processes the event
Ingestion Time:  The timestamp when the event enters the streaming system

Example:
  User clicks "Buy" at 14:00:01.000 (event time)
  Event reaches Kafka at 14:00:01.200 (ingestion time)
  Flink processes it at 14:00:03.500 (processing time)
  
  The 2.5-second gap matters for:
  - Late-arriving events (mobile users with flaky connections)
  - Out-of-order events (distributed producers)
  - Cross-timezone aggregations

ETL vs ELT

Two philosophies for moving data from source systems into analytics:

ETL — Extract, Transform, Load

Transform data before loading into the destination. The traditional approach from the data warehouse era.

# Classic ETL with Apache Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, sum as spark_sum, when

spark = SparkSession.builder.appName("etl-orders").getOrCreate()

# 1. EXTRACT — read raw data from source
raw_orders = (spark.read
    .format("jdbc")
    .option("url", "jdbc:postgresql://oltp-db:5432/production")
    .option("dbtable", "orders")
    .option("user", "etl_reader")
    .load())

raw_products = spark.read.parquet("s3://data-lake/products/")

# 2. TRANSFORM — clean, join, aggregate BEFORE loading
cleaned_orders = (raw_orders
    .filter(col("status") != "CANCELLED")
    .filter(col("amount") > 0)
    .withColumn("order_date", to_date(col("created_at")))
    .withColumn("is_high_value", when(col("amount") > 1000, True).otherwise(False))
    .join(raw_products, "product_id", "left")
    .select("order_id", "user_id", "product_name", "category",
            "amount", "order_date", "is_high_value", "region"))

daily_revenue = (cleaned_orders
    .groupBy("order_date", "category", "region")
    .agg(spark_sum("amount").alias("total_revenue"),
         spark_sum(when(col("is_high_value"), 1).otherwise(0)).alias("high_value_count")))

# 3. LOAD — write transformed data to warehouse
daily_revenue.write.mode("overwrite").partitionBy("order_date").parquet(
    "s3://warehouse/daily_revenue/"
)

ELT — Extract, Load, Transform

Load raw data first, transform inside the destination. Powered by modern cloud warehouses (BigQuery, Snowflake, Redshift) with cheap storage and powerful compute.

-- ELT: Raw data is already loaded into the warehouse via Fivetran/Airbyte/Stitch.
-- Transformation happens inside the warehouse using dbt (data build tool).

-- models/staging/stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('production', 'raw_orders') }}
),
cleaned AS (
    SELECT
        order_id,
        user_id,
        product_id,
        amount,
        CAST(created_at AS DATE)       AS order_date,
        status,
        CASE WHEN amount > 1000 THEN TRUE ELSE FALSE END AS is_high_value
    FROM source
    WHERE status != 'CANCELLED'
      AND amount > 0
)
SELECT * FROM cleaned;

-- models/marts/daily_revenue.sql
WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),
products AS (
    SELECT * FROM {{ ref('stg_products') }}
)
SELECT
    o.order_date,
    p.category,
    p.region,
    SUM(o.amount)                                         AS total_revenue,
    COUNT(CASE WHEN o.is_high_value THEN 1 END)           AS high_value_count,
    COUNT(*)                                               AS order_count
FROM orders o
LEFT JOIN products p ON o.product_id = p.product_id
GROUP BY 1, 2, 3
AspectETLELT
Transform locationExternal (Spark, Airflow, custom code)Inside the warehouse (SQL, dbt)
Raw data retained?No (only transformed data loaded)Yes (raw data always available)
FlexibilityNew transforms need pipeline changesNew transforms = new SQL model
DebuggingHard (intermediate data discarded)Easy (query raw data anytime)
Best forSensitive data, heavy transformationsAnalytics, iterative exploration

Lambda Architecture

Proposed by Nathan Marz, Lambda architecture combines batch and stream processing to provide comprehensive, low-latency views of data. It acknowledges that batch processing gives accurate results but high latency, while stream processing gives low latency but approximate results.

Three Layers

  1. Batch Layer — Stores the master dataset (immutable, append-only). Periodically runs batch jobs (Spark, Hadoop) to pre-compute batch views. High latency (hours) but perfectly accurate.
  2. Speed Layer — Processes only the most recent data in real-time (Flink, Storm, Kafka Streams). Produces real-time views that compensate for the batch layer's latency. Lower accuracy (approximations, at-least-once).
  3. Serving Layer — Merges batch views and real-time views to answer queries. When a new batch view is available, the corresponding real-time view can be discarded.
# Lambda Architecture — Conceptual Data Flow
#
# ┌────────────┐
# │ Data Source │ ─── All Events ───┬──────────────────────────────────┐
# └────────────┘                    │                                  │
#                                   ▼                                  ▼
#                          ┌─────────────────┐              ┌──────────────────┐
#                          │   Batch Layer    │              │   Speed Layer     │
#                          │  (Master Dataset │              │  (Real-time only) │
#                          │   + Spark Jobs)  │              │  Flink / KStreams │
#                          └────────┬────────┘              └────────┬─────────┘
#                                   │                                │
#                                   ▼                                ▼
#                          ┌─────────────────┐              ┌──────────────────┐
#                          │   Batch Views    │              │  Real-time Views  │
#                          │ (pre-computed)   │              │  (incremental)    │
#                          └────────┬────────┘              └────────┬─────────┘
#                                   │                                │
#                                   └───────────┬───────────────────┘
#                                               ▼
#                                      ┌─────────────────┐
#                                      │  Serving Layer   │
#                                      │  (Merge Results) │
#                                      │  Druid / Cassandra│
#                                      └─────────────────┘
#                                               │
#                                               ▼
#                                          Query API

▶ Lambda Architecture

Watch data flow through both the Batch Layer (periodic Spark) and Speed Layer (real-time Flink), merging in the Serving Layer.

Lambda Architecture — Practical Implementation

# Batch Layer: Spark job runs every 6 hours
# Reads ALL historical data, recomputes everything from scratch

from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, count, sum as spark_sum, avg

spark = SparkSession.builder.appName("batch-layer").getOrCreate()

# Master dataset: immutable, append-only event log
events = spark.read.parquet("s3://data-lake/events/")

# Batch view: hourly aggregations computed from ALL data
batch_view = (events
    .groupBy(
        window(col("event_time"), "1 hour"),
        col("event_type"),
        col("region"))
    .agg(
        count("*").alias("event_count"),
        spark_sum("revenue").alias("total_revenue"),
        avg("latency_ms").alias("avg_latency"))
    .withColumn("window_start", col("window.start"))
    .withColumn("window_end", col("window.end"))
    .drop("window"))

# Write to serving layer (e.g., Cassandra, Druid, or Elasticsearch)
batch_view.write.mode("overwrite").parquet("s3://serving/batch-views/hourly/")
// Speed Layer: Flink job runs continuously
// Processes ONLY events since the last batch view

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);  // checkpoint every 60s

DataStream<Event> events = env
    .addSource(new FlinkKafkaConsumer<>("events", new EventSchema(), kafkaProps))
    .assignTimestampsAndWatermarks(
        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
            .withTimestampAssigner((event, ts) -> event.getEventTime()));

// Real-time view: incremental aggregations
DataStream<HourlyAgg> realtimeView = events
    .keyBy(Event::getRegion)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .aggregate(new RevenueAggregator());

// Write to serving layer (real-time views table)
realtimeView.addSink(new CassandraSink<>("realtime_views"));
Lambda Architecture trade-offs:
  • Pro: Correctness guarantee — batch layer eventually corrects any streaming approximations
  • Pro: Handles late-arriving data gracefully (batch recomputes everything)
  • Con: Two codebases — you maintain batch AND streaming logic, which must produce identical results
  • Con: Operational complexity — two systems to monitor, debug, and scale
  • Con: Merging views at query time adds latency and complexity

Kappa Architecture

Jay Kreps proposed Kappa architecture as a simplification: use only stream processing. Instead of maintaining separate batch and speed layers, treat everything as a stream and use Kafka's log retention as the source of truth.

# Kappa Architecture — Single Processing Path
#
#  ┌────────────┐      ┌─────────────────────────────────┐
#  │ Data Source │ ───▶ │         Apache Kafka             │
#  └────────────┘      │   (Immutable Log, Long Retention)│
#                      │   Source of Truth for ALL Data    │
#                      └──────────────┬──────────────────┘
#                                     │
#                                     ▼
#                      ┌─────────────────────────────────┐
#                      │     Stream Processing Job        │
#                      │   (Flink / Kafka Streams)        │
#                      │   Single codebase, one logic     │
#                      └──────────────┬──────────────────┘
#                                     │
#                                     ▼
#                      ┌─────────────────────────────────┐
#                      │        Serving Layer             │
#                      │  (Materialized Views in DB)      │
#                      └─────────────────────────────────┘
#
# Reprocessing: Deploy a NEW version of the streaming job,
# have it re-read from Kafka's beginning, write to a NEW
# serving table. Once caught up, swap the tables.

Kappa Reprocessing Strategy

# How to "backfill" in Kappa: replay the Kafka log

# 1. Deploy Job v2 alongside Job v1 (different consumer group)
$ kafka-consumer-groups --bootstrap-server kafka:9092 \
    --group pipeline-v2 --topic events \
    --reset-offsets --to-earliest --execute

# 2. Job v2 reads from the beginning of the topic, writing to new tables
#    serving_views_v2 instead of serving_views_v1

# 3. Monitor v2's consumer lag. When lag = 0, it's caught up.
$ kafka-consumer-groups --bootstrap-server kafka:9092 \
    --group pipeline-v2 --describe
#   TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
#   events   0          15000000        15000000        0    ← caught up!

# 4. Atomically swap: point the query layer to v2 tables
# 5. Shut down Job v1 and delete v1 tables
AspectLambdaKappa
Processing layersBatch + Speed (two paths)Stream only (single path)
Source of truthImmutable master dataset (HDFS/S3)Kafka log (long retention)
CodebasesTwo (batch + stream logic)One (stream only)
ReprocessingBatch layer re-runs periodicallyReplay Kafka log from offset 0
Operational costHigh (two systems)Lower (one system)
When it failsNever (batch corrects streaming)If retention expires before reprocessing

Apache Spark Deep Dive

Spark is the dominant batch processing engine and increasingly used for micro-batch streaming. It processes data in-memory across a cluster, which makes it 10–100× faster than MapReduce for iterative algorithms.

Core Abstractions

RDDs (Resilient Distributed Datasets)

The original Spark abstraction — an immutable, partitioned collection of records that can be transformed in parallel.

from pyspark import SparkContext

sc = SparkContext("local[*]", "rdd-example")

# Create RDD from text file
logs = sc.textFile("s3://logs/access-log-2024-*.gz")

# Transformations (lazy — nothing executes yet)
errors = (logs
    .filter(lambda line: "ERROR" in line)          # narrow transformation
    .map(lambda line: line.split("\t"))             # narrow
    .map(lambda parts: (parts[3], 1))              # extract (error_type, 1)
    .reduceByKey(lambda a, b: a + b))              # wide transformation (shuffle)

# Action (triggers execution)
top_errors = errors.takeOrdered(10, key=lambda x: -x[1])
# [('NullPointerException', 42891), ('TimeoutException', 31204), ...]

# RDD Lineage (DAG of transformations):
# textFile → filter → map → map → reduceByKey → takeOrdered
# If a partition is lost, Spark re-executes only the necessary
# transformations on that partition.

DataFrames & Spark SQL

Higher-level API with schema awareness, Catalyst optimizer, and Tungsten execution engine. Preferred over RDDs for almost all use cases.

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, window, count, sum as spark_sum, avg, percentile_approx,
    from_json, to_timestamp, date_format, lit, when, broadcast
)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

spark = SparkSession.builder \
    .appName("analytics-pipeline") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Read from multiple sources
orders = spark.read.parquet("s3://warehouse/orders/")
users = spark.read.parquet("s3://warehouse/users/")

# Broadcast join for small dimension table (avoids shuffle)
enriched = orders.join(broadcast(users), "user_id")

# Complex aggregation with window functions
from pyspark.sql.window import Window

user_window = Window.partitionBy("user_id").orderBy("order_date").rowsBetween(-6, 0)

result = (enriched
    .withColumn("7day_rolling_avg", avg("amount").over(user_window))
    .withColumn("order_rank", row_number().over(
        Window.partitionBy("user_id").orderBy(col("amount").desc())))
    .filter(col("order_rank") <= 5)  # top 5 orders per user
    .groupBy("region", "category")
    .agg(
        count("*").alias("order_count"),
        spark_sum("amount").alias("total_revenue"),
        avg("7day_rolling_avg").alias("avg_rolling"),
        percentile_approx("amount", 0.95).alias("p95_amount")))

# Spark SQL equivalent
enriched.createOrReplaceTempView("enriched_orders")
spark.sql("""
    WITH ranked AS (
        SELECT *,
            AVG(amount) OVER (
                PARTITION BY user_id ORDER BY order_date
                ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
            ) AS rolling_avg_7d,
            ROW_NUMBER() OVER (
                PARTITION BY user_id ORDER BY amount DESC
            ) AS order_rank
        FROM enriched_orders
    )
    SELECT region, category,
           COUNT(*)           AS order_count,
           SUM(amount)        AS total_revenue,
           AVG(rolling_avg_7d) AS avg_rolling,
           PERCENTILE_APPROX(amount, 0.95) AS p95_amount
    FROM ranked WHERE order_rank <= 5
    GROUP BY region, category
""")

Spark Structured Streaming

Micro-batch streaming built on DataFrames. Treats a stream as an unbounded table, processing in small batches (default ~100ms trigger interval).

# Spark Structured Streaming — read from Kafka, write to Delta Lake
from pyspark.sql.functions import from_json, col, window

schema = StructType([
    StructField("user_id", StringType()),
    StructField("event_type", StringType()),
    StructField("amount", DoubleType()),
    StructField("event_time", TimestampType())
])

# Read stream from Kafka
stream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "user-events")
    .option("startingOffsets", "latest")
    .load()
    .select(from_json(col("value").cast("string"), schema).alias("data"))
    .select("data.*"))

# Windowed aggregation (micro-batch processes every 30 seconds)
windowed = (stream
    .withWatermark("event_time", "10 minutes")  # allow 10-min late events
    .groupBy(
        window(col("event_time"), "5 minutes"),  # 5-min tumbling windows
        col("event_type"))
    .agg(
        count("*").alias("event_count"),
        spark_sum("amount").alias("total_amount")))

# Write to Delta Lake with checkpointing
query = (windowed.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "s3://checkpoints/windowed-events/")
    .trigger(processingTime="30 seconds")
    .start("s3://warehouse/windowed_events/"))

Flink is a true stream processing engine — it processes events one at a time (not in micro-batches). This gives it lower latency, better event-time semantics, and more precise windowing than Spark Streaming.

Watermarks are Flink's mechanism for tracking event-time progress. A watermark W(t) declares: "No events with timestamp ≤ t will arrive after this point."

// Flink: Watermark strategies

// 1. Bounded out-of-orderness: events may arrive up to N seconds late
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, recordTimestamp) -> event.getTimestamp());
// If the latest event has timestamp 14:00:30, watermark = 14:00:25
// Any event with timestamp < 14:00:25 is considered "late"

// 2. Monotonously increasing timestamps (no out-of-order)
WatermarkStrategy.<Event>forMonotonousTimestamps()
    .withTimestampAssigner((event, recordTimestamp) -> event.getTimestamp());
// Watermark = max(event timestamps seen so far)

// 3. Custom watermark generator for irregular sources
WatermarkStrategy.<Event>forGenerator(ctx -> new WatermarkGenerator<Event>() {
    private long maxTimestamp = Long.MIN_VALUE;
    private final long maxOutOfOrderness = 30_000; // 30 seconds

    @Override
    public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, event.getTimestamp());
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));
    }
});
import org.apache.flink.streaming.api.windowing.assigners.*;
import org.apache.flink.streaming.api.windowing.time.Time;

DataStream<Event> events = /* ... */;

// ─── Tumbling Windows (fixed-size, non-overlapping) ───
// |----window1----|----window2----|----window3----|
// |  0:00 - 0:05  |  0:05 - 0:10  |  0:10 - 0:15  |
events.keyBy(Event::getUserId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new CountAggregator());

// ─── Sliding Windows (fixed-size, overlapping) ───
// |------window1------|
//       |------window2------|
//             |------window3------|
// slide=2min, size=5min → each event is in ⌈5/2⌉ = 3 windows
events.keyBy(Event::getUserId)
    .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(2)))
    .aggregate(new CountAggregator());

// ─── Session Windows (gap-based, variable-size) ───
// |--session1--|    gap > 10min    |--session2--|
// Events within 10 min of each other merge into one session.
events.keyBy(Event::getUserId)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .aggregate(new SessionAggregator());

// ─── Global Windows (all events in one window, needs custom trigger) ───
events.keyBy(Event::getUserId)
    .window(GlobalWindows.create())
    .trigger(CountTrigger.of(100))  // fire every 100 events
    .aggregate(new BatchAggregator());
// Flink: Stateful fraud detection with keyed state

public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {
    // State persisted across events, automatically checkpointed
    private ValueState<Boolean> flagState;
    private ValueState<Long> timerState;

    @Override
    public void open(Configuration config) {
        flagState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("flag", Types.BOOLEAN));
        timerState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("timer", Types.LONG));
    }

    @Override
    public void processElement(Transaction tx, Context ctx, Collector<Alert> out)
            throws Exception {
        Boolean flagged = flagState.value();

        if (flagged != null && flagged) {
            // Previous tx was small (<$1), and this one is large (>$500)
            // within 1 minute → FRAUD ALERT
            if (tx.getAmount() > 500.0) {
                out.collect(new Alert(tx.getAccountId(), tx.getAmount()));
            }
            clearState();
        }

        if (tx.getAmount() < 1.0) {
            // Small "test" transaction — set flag and timer
            flagState.update(true);
            long timer = ctx.timerService().currentProcessingTime() + 60_000;
            ctx.timerService().registerProcessingTimeTimer(timer);
            timerState.update(timer);
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
        // Timer fired — no large transaction within 1 minute, clear flag
        clearState();
    }

    private void clearState() {
        flagState.clear();
        Long timer = timerState.value();
        if (timer != null) timerState.clear();
    }
}

Windowing Strategies

Windows group unbounded streams into finite chunks for aggregation. The choice of windowing strategy determines the semantics of your pipeline.

Window TypeSemanticsUse CaseExample
TumblingFixed-size, non-overlapping, aligned to epochPeriodic aggregation (hourly counts)[0:00–0:05) [0:05–0:10) [0:10–0:15)
SlidingFixed-size, overlapping by slide intervalMoving averages, trend detection5-min window, 1-min slide → 5 overlapping windows
SessionVariable-size, bounded by inactivity gapUser sessions, click streamsGap=10min: events merge if within 10 min
GlobalAll events in one window (needs trigger)Count-based batching, custom logicFire every 1000 events or every 5 min

▶ Stream Windowing

Visualize how the same events are grouped differently by tumbling, sliding, and session windows.

Handling Late Events

// Flink: Allowed lateness and side output for late events

final OutputTag<Event> lateTag = new OutputTag<Event>("late-events") {};

SingleOutputStreamOperator<AggResult> result = events
    .keyBy(Event::getUserId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(2))    // re-fire window for late events
    .sideOutputLateData(lateTag)          // events arriving after lateness → side output
    .aggregate(new CountAggregator());

// Process truly late events separately (e.g., write to a "late events" table)
DataStream<Event> lateEvents = result.getSideOutput(lateTag);
lateEvents.addSink(new LateEventSink());

// Watermark timeline example:
// Window [14:00, 14:05) fires when watermark passes 14:05
// Event with timestamp 14:03 arrives when watermark is 14:06
//   → Allowed lateness = 2 min → 14:06 < 14:07 → window RE-FIRES with updated result
// Event with timestamp 14:01 arrives when watermark is 14:08
//   → 14:08 > 14:07 → goes to side output (truly late)

Kafka Streams & Apache Beam

Kafka Streams

A lightweight stream processing library (not a cluster framework). It runs as a regular Java application — no separate infrastructure. Perfect for microservice-level stream processing.

// Kafka Streams: Real-time page view counting
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "pageview-counter");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(StreamsConfig.EXACTLY_ONCE_V2, "true");  // exactly-once semantics

StreamsBuilder builder = new StreamsBuilder();

// Input: page view events from Kafka topic
KStream<String, PageView> pageViews = builder.stream("page-views",
    Consumed.with(Serdes.String(), pageViewSerde));

// Windowed count: page views per URL per 5-minute window
KTable<Windowed<String>, Long> viewCounts = pageViews
    .groupBy((key, view) -> view.getUrl(),
             Grouped.with(Serdes.String(), pageViewSerde))
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count(Materialized.as("pageview-counts"));

// Output: write aggregated counts to output topic
viewCounts.toStream()
    .map((windowedKey, count) -> KeyValue.pair(
        windowedKey.key(),
        new PageViewCount(windowedKey.key(), count, windowedKey.window().start())))
    .to("pageview-counts-output", Produced.with(Serdes.String(), countSerde));

// Interactive queries: query state store directly from HTTP API
// No need for external database!
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

// Query local state store
ReadOnlyWindowStore<String, Long> store =
    streams.store(StoreQueryParameters.fromNameAndType(
        "pageview-counts", QueryableStoreTypes.windowStore()));
WindowStoreIterator<Long> iter = store.fetch("/home", Instant.now().minus(1, HOURS), Instant.now());

Apache Beam

Beam provides a unified programming model that runs on multiple backends (Flink, Spark, Dataflow, Samza). Write once, run anywhere.

import apache_beam as beam
from apache_beam.transforms.window import FixedWindows, SlidingWindows, Sessions
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode

# Apache Beam pipeline — runs on Flink, Spark, or Google Dataflow
with beam.Pipeline(options=pipeline_options) as p:
    events = (p
        | "ReadFromKafka" >> beam.io.ReadFromKafka(
            consumer_config={"bootstrap.servers": "kafka:9092"},
            topics=["user-events"])
        | "ParseJSON" >> beam.Map(parse_event)
        | "WithTimestamps" >> beam.Map(
            lambda e: beam.window.TimestampedValue(e, e["event_time"])))

    # Tumbling window with late data handling
    hourly_counts = (events
        | "HourlyWindow" >> beam.WindowInto(
            FixedWindows(3600),  # 1-hour tumbling window
            trigger=AfterWatermark(
                early=AfterProcessingTime(60),    # speculative results every 60s
                late=AfterProcessingTime(600)),    # late results every 10 min
            accumulation_mode=AccumulationMode.ACCUMULATING,
            allowed_lateness=7200)  # 2 hours
        | "CountPerType" >> beam.CombinePerKey(sum)
        | "WriteToBigQuery" >> beam.io.WriteToBigQuery(
            "project:dataset.hourly_counts",
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

Exactly-Once Semantics in Streaming

The holy grail of stream processing. Achieving exactly-once is about ensuring that each event affects the output exactly one time, even in the face of failures, restarts, and network partitions.

Three Delivery Guarantees

GuaranteeMeaningImplementation
At-most-onceEvents may be lost, never duplicatedCommit offset before processing
At-least-onceEvents never lost, may be duplicatedCommit offset after processing (retry on failure)
Exactly-onceEvents processed exactly once in outputCheckpointing + transactions + idempotency

Flink Checkpointing

// Flink achieves exactly-once via distributed snapshots (Chandy-Lamport algorithm)

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Enable checkpointing every 60 seconds
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);

// Checkpoint configuration
CheckpointConfig config = env.getCheckpointConfig();
config.setMinPauseBetweenCheckpoints(30_000);    // min 30s between checkpoints
config.setCheckpointTimeout(600_000);             // fail if checkpoint takes > 10 min
config.setMaxConcurrentCheckpoints(1);            // only 1 checkpoint at a time
config.setTolerableCheckpointFailureNumber(3);    // allow 3 failures before job fails
config.setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  // keep on cancel

// State backend: where state + checkpoints are stored
env.setStateBackend(new EmbeddedRocksDBStateBackend());  // for large state
config.setCheckpointStorage("s3://checkpoints/flink/my-job/");

// How it works:
// 1. JobManager injects "checkpoint barriers" into the data stream
// 2. Each barrier flows through the DAG like a regular event
// 3. When an operator receives barriers from ALL input channels:
//    a. It snapshots its state to the checkpoint storage
//    b. It forwards the barrier downstream
// 4. When ALL operators have snapshotted → checkpoint is complete
// 5. On failure: restore ALL operators to the last completed checkpoint
//    and replay events from Kafka starting at the checkpointed offsets

Kafka Transactions (End-to-End Exactly-Once)

// Kafka producer transactions: atomic read-process-write

Properties props = new Properties();
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-producer");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");  // required for transactions

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

producer.initTransactions();

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    producer.beginTransaction();
    try {
        for (ConsumerRecord<String, String> record : records) {
            // Process the record
            String result = transform(record.value());
            
            // Write output atomically
            producer.send(new ProducerRecord<>("output-topic", record.key(), result));
        }
        
        // Commit consumer offsets AS PART OF the transaction
        // This is the key: offset commit + output writes are atomic
        producer.sendOffsetsToTransaction(
            offsetsToCommit(records),
            consumer.groupMetadata());
        
        producer.commitTransaction();
    } catch (Exception e) {
        producer.abortTransaction();
        // On restart: consumer reads from last committed offset
        // → no duplicates, no data loss
    }
}

// End-to-end exactly-once requires:
// 1. Source supports replay (Kafka: seek to offset)
// 2. Processing is deterministic (or state is checkpointed)
// 3. Sink supports transactions (Kafka, idempotent DB writes)
// 4. Atomic commit of offsets + state + output

Data Quality & Schema Evolution

Schema Registry & Evolution

As producers and consumers evolve independently, schemas change. A schema registry (Confluent Schema Registry, AWS Glue) enforces compatibility rules.

# Avro schema evolution example

# v1: Original schema
{
  "type": "record",
  "name": "UserEvent",
  "fields": [
    {"name": "user_id",    "type": "string"},
    {"name": "event_type", "type": "string"},
    {"name": "timestamp",  "type": "long"}
  ]
}

# v2: BACKWARD COMPATIBLE — added field with default
# Old consumers can still read new data (they ignore the new field)
{
  "type": "record",
  "name": "UserEvent",
  "fields": [
    {"name": "user_id",    "type": "string"},
    {"name": "event_type", "type": "string"},
    {"name": "timestamp",  "type": "long"},
    {"name": "region",     "type": "string", "default": "unknown"}  ← NEW
  ]
}

# v3: FORWARD COMPATIBLE — removed optional field
# New consumers can still read old data (missing field gets default)

# Compatibility modes:
# BACKWARD   — new schema can read old data (safe for consumers to upgrade first)
# FORWARD    — old schema can read new data (safe for producers to upgrade first)
# FULL       — both backward AND forward compatible
# NONE       — no compatibility checks (dangerous!)

Data Quality Checks in Pipelines

# Great Expectations: data quality in Spark pipelines
import great_expectations as gx

context = gx.get_context()

# Define expectations for an orders dataset
validator = context.sources.pandas_default.read_dataframe(orders_df)
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=1_000_000)
validator.expect_column_values_to_be_in_set("status", ["PENDING", "COMPLETED", "CANCELLED"])
validator.expect_column_pair_values_a_to_be_greater_than_b("shipped_at", "created_at")
validator.expect_column_values_to_match_regex("email", r"^[^@]+@[^@]+\.[^@]+$")

# Freshness check: no data older than 24 hours
validator.expect_column_max_to_be_between(
    "created_at",
    min_value=(datetime.now() - timedelta(hours=24)).isoformat())

results = validator.validate()
if not results.success:
    alert_on_call_engineer(results)  # PagerDuty / Slack alert
    quarantine_bad_records(results)  # move to dead-letter table

Backfill Strategies

Backfilling — reprocessing historical data through a new or updated pipeline — is one of the hardest operational challenges in data engineering.

StrategyHow It WorksProsCons
Full replayRe-read entire Kafka log or data lakeSimple, correctSlow, expensive, Kafka retention limits
Dual-writeRun old + new pipeline simultaneouslySafe, can compare outputs2× resource cost
Partition-basedReprocess one date partition at a timeIncremental, resumableComplex scheduling
Blue-greenWrite to new table, swap when doneZero downtimeNeeds 2× storage temporarily
# Airflow: Partition-based backfill with idempotent tasks
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def process_partition(ds, **kwargs):
    """Process a single date partition. Idempotent: overwrites output."""
    spark = get_spark_session()
    
    # Read only this partition
    events = spark.read.parquet(f"s3://data-lake/events/date={ds}/")
    
    # Transform
    result = transform_events(events)
    
    # Write with overwrite — makes this idempotent
    result.write.mode("overwrite").parquet(
        f"s3://warehouse/transformed/date={ds}/")

dag = DAG(
    "backfill_pipeline",
    start_date=datetime(2024, 1, 1),
    end_date=datetime(2024, 12, 31),
    schedule_interval="@daily",
    max_active_runs=8,   # parallelize: 8 days at once
    catchup=True,        # process all historical dates
)

process = PythonOperator(
    task_id="process_partition",
    python_callable=process_partition,
    dag=dag,
)

# Run backfill:
# airflow dags backfill backfill_pipeline \
#     --start-date 2024-01-01 --end-date 2024-12-31 \
#     --reset-dagruns

Real-World Pipeline Architecture

A production-grade data platform at scale typically combines multiple patterns:

# Production Data Platform Architecture (e-commerce scale)
#
# ┌─────────────────────────────────────────────────────────────────────────┐
# │                         DATA SOURCES                                    │
# │  Web App │ Mobile App │ Payment Service │ Inventory │ Third-party APIs  │
# └─────┬─────────┬──────────────┬────────────┬─────────────┬──────────────┘
#       │         │              │            │             │
#       ▼         ▼              ▼            ▼             ▼
# ┌─────────────────────────────────────────────────────────────────────────┐
# │                    INGESTION LAYER                                      │
# │  Kafka (event streams)  │  Debezium CDC  │  Fivetran (SaaS connectors) │
# └──────────────┬────────────────┬──────────────────┬─────────────────────┘
#                │                │                   │
#       ┌────────┴───────┐       │                   │
#       ▼                ▼       ▼                   ▼
# ┌───────────┐  ┌────────────────────────────────────────────────────────┐
# │ SPEED     │  │              DATA LAKE (S3 / GCS / ADLS)               │
# │ LAYER     │  │   Raw Zone  │  Staging Zone  │  Curated Zone           │
# │ Flink     │  │  (Parquet)  │  (cleaned)     │  (analytics-ready)      │
# │ (real-    │  └──────┬─────────────┬────────────────┬──────────────────┘
# │  time)    │         │             │                │
# └─────┬─────┘         ▼             ▼                ▼
#       │        ┌─────────────────────────────────────────────────────────┐
#       │        │          BATCH PROCESSING (Spark + Airflow + dbt)       │
#       │        │  Data quality checks (Great Expectations)               │
#       │        │  Feature engineering for ML                             │
#       │        │  Aggregations, joins, deduplication                     │
#       │        └──────────────────────────┬──────────────────────────────┘
#       │                                   │
#       ▼                                   ▼
# ┌─────────────────────────────────────────────────────────────────────────┐
# │                     SERVING LAYER                                       │
# │  Snowflake/BigQuery │ Elasticsearch │ Redis │ Druid │ Pinot            │
# │  (analytics)        │ (search)      │ (cache)│ (OLAP)│ (real-time)     │
# └──────────────────────────────────┬──────────────────────────────────────┘
#                                    │
#                                    ▼
# ┌─────────────────────────────────────────────────────────────────────────┐
# │                     CONSUMPTION LAYER                                   │
# │  Dashboards (Looker) │ ML Models │ APIs │ Alerts │ Reports              │
# └─────────────────────────────────────────────────────────────────────────┘

Key Takeaways