Data Pipelines & Stream Processing
Batch vs Stream Processing
Data processing falls into two fundamental paradigms. The choice between them dictates your architecture, latency budget, infrastructure cost, and operational complexity.
| Dimension | Batch Processing | Stream Processing |
|---|---|---|
| Data model | Bounded datasets (files, tables) | Unbounded event streams |
| Latency | Minutes to hours | Milliseconds to seconds |
| Throughput | Very high (optimized for bulk) | High (per-event overhead) |
| State management | Implicit (full dataset available) | Explicit (state stores, checkpoints) |
| Fault tolerance | Restart the job | Checkpoints, exactly-once guarantees |
| Scheduling | Cron, Airflow, Dagster | Always-on, event-driven |
| Cost model | Pay per run (can use spot instances) | Continuous resource allocation |
| Examples | Spark batch, Hive, MapReduce, dbt | Flink, Kafka Streams, Spark Streaming |
- Batch — nightly analytics, ML model training, data warehouse loading, historical reprocessing
- Stream — fraud detection, real-time dashboards, event-driven microservices, anomaly detection
- Micro-batch — compromise (Spark Structured Streaming): near-real-time with batch-like simplicity
Time Semantics: The Core Distinction
In streaming, when an event happened and when it arrives are different things:
Event Time: The timestamp embedded in the event (when it actually occurred)
Processing Time: The wall-clock time when the system processes the event
Ingestion Time: The timestamp when the event enters the streaming system
Example:
User clicks "Buy" at 14:00:01.000 (event time)
Event reaches Kafka at 14:00:01.200 (ingestion time)
Flink processes it at 14:00:03.500 (processing time)
The 2.5-second gap matters for:
- Late-arriving events (mobile users with flaky connections)
- Out-of-order events (distributed producers)
- Cross-timezone aggregations
ETL vs ELT
Two philosophies for moving data from source systems into analytics:
ETL — Extract, Transform, Load
Transform data before loading into the destination. The traditional approach from the data warehouse era.
# Classic ETL with Apache Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, sum as spark_sum, when
spark = SparkSession.builder.appName("etl-orders").getOrCreate()
# 1. EXTRACT — read raw data from source
raw_orders = (spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://oltp-db:5432/production")
.option("dbtable", "orders")
.option("user", "etl_reader")
.load())
raw_products = spark.read.parquet("s3://data-lake/products/")
# 2. TRANSFORM — clean, join, aggregate BEFORE loading
cleaned_orders = (raw_orders
.filter(col("status") != "CANCELLED")
.filter(col("amount") > 0)
.withColumn("order_date", to_date(col("created_at")))
.withColumn("is_high_value", when(col("amount") > 1000, True).otherwise(False))
.join(raw_products, "product_id", "left")
.select("order_id", "user_id", "product_name", "category",
"amount", "order_date", "is_high_value", "region"))
daily_revenue = (cleaned_orders
.groupBy("order_date", "category", "region")
.agg(spark_sum("amount").alias("total_revenue"),
spark_sum(when(col("is_high_value"), 1).otherwise(0)).alias("high_value_count")))
# 3. LOAD — write transformed data to warehouse
daily_revenue.write.mode("overwrite").partitionBy("order_date").parquet(
"s3://warehouse/daily_revenue/"
)
ELT — Extract, Load, Transform
Load raw data first, transform inside the destination. Powered by modern cloud warehouses (BigQuery, Snowflake, Redshift) with cheap storage and powerful compute.
-- ELT: Raw data is already loaded into the warehouse via Fivetran/Airbyte/Stitch.
-- Transformation happens inside the warehouse using dbt (data build tool).
-- models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('production', 'raw_orders') }}
),
cleaned AS (
SELECT
order_id,
user_id,
product_id,
amount,
CAST(created_at AS DATE) AS order_date,
status,
CASE WHEN amount > 1000 THEN TRUE ELSE FALSE END AS is_high_value
FROM source
WHERE status != 'CANCELLED'
AND amount > 0
)
SELECT * FROM cleaned;
-- models/marts/daily_revenue.sql
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
products AS (
SELECT * FROM {{ ref('stg_products') }}
)
SELECT
o.order_date,
p.category,
p.region,
SUM(o.amount) AS total_revenue,
COUNT(CASE WHEN o.is_high_value THEN 1 END) AS high_value_count,
COUNT(*) AS order_count
FROM orders o
LEFT JOIN products p ON o.product_id = p.product_id
GROUP BY 1, 2, 3
| Aspect | ETL | ELT |
|---|---|---|
| Transform location | External (Spark, Airflow, custom code) | Inside the warehouse (SQL, dbt) |
| Raw data retained? | No (only transformed data loaded) | Yes (raw data always available) |
| Flexibility | New transforms need pipeline changes | New transforms = new SQL model |
| Debugging | Hard (intermediate data discarded) | Easy (query raw data anytime) |
| Best for | Sensitive data, heavy transformations | Analytics, iterative exploration |
Lambda Architecture
Proposed by Nathan Marz, Lambda architecture combines batch and stream processing to provide comprehensive, low-latency views of data. It acknowledges that batch processing gives accurate results but high latency, while stream processing gives low latency but approximate results.
Three Layers
- Batch Layer — Stores the master dataset (immutable, append-only). Periodically runs batch jobs (Spark, Hadoop) to pre-compute batch views. High latency (hours) but perfectly accurate.
- Speed Layer — Processes only the most recent data in real-time (Flink, Storm, Kafka Streams). Produces real-time views that compensate for the batch layer's latency. Lower accuracy (approximations, at-least-once).
- Serving Layer — Merges batch views and real-time views to answer queries. When a new batch view is available, the corresponding real-time view can be discarded.
# Lambda Architecture — Conceptual Data Flow
#
# ┌────────────┐
# │ Data Source │ ─── All Events ───┬──────────────────────────────────┐
# └────────────┘ │ │
# ▼ ▼
# ┌─────────────────┐ ┌──────────────────┐
# │ Batch Layer │ │ Speed Layer │
# │ (Master Dataset │ │ (Real-time only) │
# │ + Spark Jobs) │ │ Flink / KStreams │
# └────────┬────────┘ └────────┬─────────┘
# │ │
# ▼ ▼
# ┌─────────────────┐ ┌──────────────────┐
# │ Batch Views │ │ Real-time Views │
# │ (pre-computed) │ │ (incremental) │
# └────────┬────────┘ └────────┬─────────┘
# │ │
# └───────────┬───────────────────┘
# ▼
# ┌─────────────────┐
# │ Serving Layer │
# │ (Merge Results) │
# │ Druid / Cassandra│
# └─────────────────┘
# │
# ▼
# Query API
▶ Lambda Architecture
Watch data flow through both the Batch Layer (periodic Spark) and Speed Layer (real-time Flink), merging in the Serving Layer.
Lambda Architecture — Practical Implementation
# Batch Layer: Spark job runs every 6 hours
# Reads ALL historical data, recomputes everything from scratch
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, count, sum as spark_sum, avg
spark = SparkSession.builder.appName("batch-layer").getOrCreate()
# Master dataset: immutable, append-only event log
events = spark.read.parquet("s3://data-lake/events/")
# Batch view: hourly aggregations computed from ALL data
batch_view = (events
.groupBy(
window(col("event_time"), "1 hour"),
col("event_type"),
col("region"))
.agg(
count("*").alias("event_count"),
spark_sum("revenue").alias("total_revenue"),
avg("latency_ms").alias("avg_latency"))
.withColumn("window_start", col("window.start"))
.withColumn("window_end", col("window.end"))
.drop("window"))
# Write to serving layer (e.g., Cassandra, Druid, or Elasticsearch)
batch_view.write.mode("overwrite").parquet("s3://serving/batch-views/hourly/")
// Speed Layer: Flink job runs continuously
// Processes ONLY events since the last batch view
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // checkpoint every 60s
DataStream<Event> events = env
.addSource(new FlinkKafkaConsumer<>("events", new EventSchema(), kafkaProps))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, ts) -> event.getEventTime()));
// Real-time view: incremental aggregations
DataStream<HourlyAgg> realtimeView = events
.keyBy(Event::getRegion)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new RevenueAggregator());
// Write to serving layer (real-time views table)
realtimeView.addSink(new CassandraSink<>("realtime_views"));
- Pro: Correctness guarantee — batch layer eventually corrects any streaming approximations
- Pro: Handles late-arriving data gracefully (batch recomputes everything)
- Con: Two codebases — you maintain batch AND streaming logic, which must produce identical results
- Con: Operational complexity — two systems to monitor, debug, and scale
- Con: Merging views at query time adds latency and complexity
Kappa Architecture
Jay Kreps proposed Kappa architecture as a simplification: use only stream processing. Instead of maintaining separate batch and speed layers, treat everything as a stream and use Kafka's log retention as the source of truth.
# Kappa Architecture — Single Processing Path
#
# ┌────────────┐ ┌─────────────────────────────────┐
# │ Data Source │ ───▶ │ Apache Kafka │
# └────────────┘ │ (Immutable Log, Long Retention)│
# │ Source of Truth for ALL Data │
# └──────────────┬──────────────────┘
# │
# ▼
# ┌─────────────────────────────────┐
# │ Stream Processing Job │
# │ (Flink / Kafka Streams) │
# │ Single codebase, one logic │
# └──────────────┬──────────────────┘
# │
# ▼
# ┌─────────────────────────────────┐
# │ Serving Layer │
# │ (Materialized Views in DB) │
# └─────────────────────────────────┘
#
# Reprocessing: Deploy a NEW version of the streaming job,
# have it re-read from Kafka's beginning, write to a NEW
# serving table. Once caught up, swap the tables.
Kappa Reprocessing Strategy
# How to "backfill" in Kappa: replay the Kafka log
# 1. Deploy Job v2 alongside Job v1 (different consumer group)
$ kafka-consumer-groups --bootstrap-server kafka:9092 \
--group pipeline-v2 --topic events \
--reset-offsets --to-earliest --execute
# 2. Job v2 reads from the beginning of the topic, writing to new tables
# serving_views_v2 instead of serving_views_v1
# 3. Monitor v2's consumer lag. When lag = 0, it's caught up.
$ kafka-consumer-groups --bootstrap-server kafka:9092 \
--group pipeline-v2 --describe
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# events 0 15000000 15000000 0 ← caught up!
# 4. Atomically swap: point the query layer to v2 tables
# 5. Shut down Job v1 and delete v1 tables
| Aspect | Lambda | Kappa |
|---|---|---|
| Processing layers | Batch + Speed (two paths) | Stream only (single path) |
| Source of truth | Immutable master dataset (HDFS/S3) | Kafka log (long retention) |
| Codebases | Two (batch + stream logic) | One (stream only) |
| Reprocessing | Batch layer re-runs periodically | Replay Kafka log from offset 0 |
| Operational cost | High (two systems) | Lower (one system) |
| When it fails | Never (batch corrects streaming) | If retention expires before reprocessing |
Apache Spark Deep Dive
Spark is the dominant batch processing engine and increasingly used for micro-batch streaming. It processes data in-memory across a cluster, which makes it 10–100× faster than MapReduce for iterative algorithms.
Core Abstractions
RDDs (Resilient Distributed Datasets)
The original Spark abstraction — an immutable, partitioned collection of records that can be transformed in parallel.
from pyspark import SparkContext
sc = SparkContext("local[*]", "rdd-example")
# Create RDD from text file
logs = sc.textFile("s3://logs/access-log-2024-*.gz")
# Transformations (lazy — nothing executes yet)
errors = (logs
.filter(lambda line: "ERROR" in line) # narrow transformation
.map(lambda line: line.split("\t")) # narrow
.map(lambda parts: (parts[3], 1)) # extract (error_type, 1)
.reduceByKey(lambda a, b: a + b)) # wide transformation (shuffle)
# Action (triggers execution)
top_errors = errors.takeOrdered(10, key=lambda x: -x[1])
# [('NullPointerException', 42891), ('TimeoutException', 31204), ...]
# RDD Lineage (DAG of transformations):
# textFile → filter → map → map → reduceByKey → takeOrdered
# If a partition is lost, Spark re-executes only the necessary
# transformations on that partition.
DataFrames & Spark SQL
Higher-level API with schema awareness, Catalyst optimizer, and Tungsten execution engine. Preferred over RDDs for almost all use cases.
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col, window, count, sum as spark_sum, avg, percentile_approx,
from_json, to_timestamp, date_format, lit, when, broadcast
)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
spark = SparkSession.builder \
.appName("analytics-pipeline") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# Read from multiple sources
orders = spark.read.parquet("s3://warehouse/orders/")
users = spark.read.parquet("s3://warehouse/users/")
# Broadcast join for small dimension table (avoids shuffle)
enriched = orders.join(broadcast(users), "user_id")
# Complex aggregation with window functions
from pyspark.sql.window import Window
user_window = Window.partitionBy("user_id").orderBy("order_date").rowsBetween(-6, 0)
result = (enriched
.withColumn("7day_rolling_avg", avg("amount").over(user_window))
.withColumn("order_rank", row_number().over(
Window.partitionBy("user_id").orderBy(col("amount").desc())))
.filter(col("order_rank") <= 5) # top 5 orders per user
.groupBy("region", "category")
.agg(
count("*").alias("order_count"),
spark_sum("amount").alias("total_revenue"),
avg("7day_rolling_avg").alias("avg_rolling"),
percentile_approx("amount", 0.95).alias("p95_amount")))
# Spark SQL equivalent
enriched.createOrReplaceTempView("enriched_orders")
spark.sql("""
WITH ranked AS (
SELECT *,
AVG(amount) OVER (
PARTITION BY user_id ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS rolling_avg_7d,
ROW_NUMBER() OVER (
PARTITION BY user_id ORDER BY amount DESC
) AS order_rank
FROM enriched_orders
)
SELECT region, category,
COUNT(*) AS order_count,
SUM(amount) AS total_revenue,
AVG(rolling_avg_7d) AS avg_rolling,
PERCENTILE_APPROX(amount, 0.95) AS p95_amount
FROM ranked WHERE order_rank <= 5
GROUP BY region, category
""")
Spark Structured Streaming
Micro-batch streaming built on DataFrames. Treats a stream as an unbounded table, processing in small batches (default ~100ms trigger interval).
# Spark Structured Streaming — read from Kafka, write to Delta Lake
from pyspark.sql.functions import from_json, col, window
schema = StructType([
StructField("user_id", StringType()),
StructField("event_type", StringType()),
StructField("amount", DoubleType()),
StructField("event_time", TimestampType())
])
# Read stream from Kafka
stream = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "latest")
.load()
.select(from_json(col("value").cast("string"), schema).alias("data"))
.select("data.*"))
# Windowed aggregation (micro-batch processes every 30 seconds)
windowed = (stream
.withWatermark("event_time", "10 minutes") # allow 10-min late events
.groupBy(
window(col("event_time"), "5 minutes"), # 5-min tumbling windows
col("event_type"))
.agg(
count("*").alias("event_count"),
spark_sum("amount").alias("total_amount")))
# Write to Delta Lake with checkpointing
query = (windowed.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "s3://checkpoints/windowed-events/")
.trigger(processingTime="30 seconds")
.start("s3://warehouse/windowed_events/"))
Apache Flink Deep Dive
Flink is a true stream processing engine — it processes events one at a time (not in micro-batches). This gives it lower latency, better event-time semantics, and more precise windowing than Spark Streaming.
Event Time & Watermarks
Watermarks are Flink's mechanism for tracking event-time progress. A watermark W(t) declares: "No events with timestamp ≤ t will arrive after this point."
// Flink: Watermark strategies
// 1. Bounded out-of-orderness: events may arrive up to N seconds late
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, recordTimestamp) -> event.getTimestamp());
// If the latest event has timestamp 14:00:30, watermark = 14:00:25
// Any event with timestamp < 14:00:25 is considered "late"
// 2. Monotonously increasing timestamps (no out-of-order)
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, recordTimestamp) -> event.getTimestamp());
// Watermark = max(event timestamps seen so far)
// 3. Custom watermark generator for irregular sources
WatermarkStrategy.<Event>forGenerator(ctx -> new WatermarkGenerator<Event>() {
private long maxTimestamp = Long.MIN_VALUE;
private final long maxOutOfOrderness = 30_000; // 30 seconds
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, event.getTimestamp());
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));
}
});
Flink Window Types
import org.apache.flink.streaming.api.windowing.assigners.*;
import org.apache.flink.streaming.api.windowing.time.Time;
DataStream<Event> events = /* ... */;
// ─── Tumbling Windows (fixed-size, non-overlapping) ───
// |----window1----|----window2----|----window3----|
// | 0:00 - 0:05 | 0:05 - 0:10 | 0:10 - 0:15 |
events.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new CountAggregator());
// ─── Sliding Windows (fixed-size, overlapping) ───
// |------window1------|
// |------window2------|
// |------window3------|
// slide=2min, size=5min → each event is in ⌈5/2⌉ = 3 windows
events.keyBy(Event::getUserId)
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(2)))
.aggregate(new CountAggregator());
// ─── Session Windows (gap-based, variable-size) ───
// |--session1--| gap > 10min |--session2--|
// Events within 10 min of each other merge into one session.
events.keyBy(Event::getUserId)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.aggregate(new SessionAggregator());
// ─── Global Windows (all events in one window, needs custom trigger) ───
events.keyBy(Event::getUserId)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(100)) // fire every 100 events
.aggregate(new BatchAggregator());
Stateful Stream Processing
// Flink: Stateful fraud detection with keyed state
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {
// State persisted across events, automatically checkpointed
private ValueState<Boolean> flagState;
private ValueState<Long> timerState;
@Override
public void open(Configuration config) {
flagState = getRuntimeContext().getState(
new ValueStateDescriptor<>("flag", Types.BOOLEAN));
timerState = getRuntimeContext().getState(
new ValueStateDescriptor<>("timer", Types.LONG));
}
@Override
public void processElement(Transaction tx, Context ctx, Collector<Alert> out)
throws Exception {
Boolean flagged = flagState.value();
if (flagged != null && flagged) {
// Previous tx was small (<$1), and this one is large (>$500)
// within 1 minute → FRAUD ALERT
if (tx.getAmount() > 500.0) {
out.collect(new Alert(tx.getAccountId(), tx.getAmount()));
}
clearState();
}
if (tx.getAmount() < 1.0) {
// Small "test" transaction — set flag and timer
flagState.update(true);
long timer = ctx.timerService().currentProcessingTime() + 60_000;
ctx.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
// Timer fired — no large transaction within 1 minute, clear flag
clearState();
}
private void clearState() {
flagState.clear();
Long timer = timerState.value();
if (timer != null) timerState.clear();
}
}
Windowing Strategies
Windows group unbounded streams into finite chunks for aggregation. The choice of windowing strategy determines the semantics of your pipeline.
| Window Type | Semantics | Use Case | Example |
|---|---|---|---|
| Tumbling | Fixed-size, non-overlapping, aligned to epoch | Periodic aggregation (hourly counts) | [0:00–0:05) [0:05–0:10) [0:10–0:15) |
| Sliding | Fixed-size, overlapping by slide interval | Moving averages, trend detection | 5-min window, 1-min slide → 5 overlapping windows |
| Session | Variable-size, bounded by inactivity gap | User sessions, click streams | Gap=10min: events merge if within 10 min |
| Global | All events in one window (needs trigger) | Count-based batching, custom logic | Fire every 1000 events or every 5 min |
▶ Stream Windowing
Visualize how the same events are grouped differently by tumbling, sliding, and session windows.
Handling Late Events
// Flink: Allowed lateness and side output for late events
final OutputTag<Event> lateTag = new OutputTag<Event>("late-events") {};
SingleOutputStreamOperator<AggResult> result = events
.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(2)) // re-fire window for late events
.sideOutputLateData(lateTag) // events arriving after lateness → side output
.aggregate(new CountAggregator());
// Process truly late events separately (e.g., write to a "late events" table)
DataStream<Event> lateEvents = result.getSideOutput(lateTag);
lateEvents.addSink(new LateEventSink());
// Watermark timeline example:
// Window [14:00, 14:05) fires when watermark passes 14:05
// Event with timestamp 14:03 arrives when watermark is 14:06
// → Allowed lateness = 2 min → 14:06 < 14:07 → window RE-FIRES with updated result
// Event with timestamp 14:01 arrives when watermark is 14:08
// → 14:08 > 14:07 → goes to side output (truly late)
Kafka Streams & Apache Beam
Kafka Streams
A lightweight stream processing library (not a cluster framework). It runs as a regular Java application — no separate infrastructure. Perfect for microservice-level stream processing.
// Kafka Streams: Real-time page view counting
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "pageview-counter");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(StreamsConfig.EXACTLY_ONCE_V2, "true"); // exactly-once semantics
StreamsBuilder builder = new StreamsBuilder();
// Input: page view events from Kafka topic
KStream<String, PageView> pageViews = builder.stream("page-views",
Consumed.with(Serdes.String(), pageViewSerde));
// Windowed count: page views per URL per 5-minute window
KTable<Windowed<String>, Long> viewCounts = pageViews
.groupBy((key, view) -> view.getUrl(),
Grouped.with(Serdes.String(), pageViewSerde))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("pageview-counts"));
// Output: write aggregated counts to output topic
viewCounts.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key(),
new PageViewCount(windowedKey.key(), count, windowedKey.window().start())))
.to("pageview-counts-output", Produced.with(Serdes.String(), countSerde));
// Interactive queries: query state store directly from HTTP API
// No need for external database!
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Query local state store
ReadOnlyWindowStore<String, Long> store =
streams.store(StoreQueryParameters.fromNameAndType(
"pageview-counts", QueryableStoreTypes.windowStore()));
WindowStoreIterator<Long> iter = store.fetch("/home", Instant.now().minus(1, HOURS), Instant.now());
Apache Beam
Beam provides a unified programming model that runs on multiple backends (Flink, Spark, Dataflow, Samza). Write once, run anywhere.
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows, SlidingWindows, Sessions
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
# Apache Beam pipeline — runs on Flink, Spark, or Google Dataflow
with beam.Pipeline(options=pipeline_options) as p:
events = (p
| "ReadFromKafka" >> beam.io.ReadFromKafka(
consumer_config={"bootstrap.servers": "kafka:9092"},
topics=["user-events"])
| "ParseJSON" >> beam.Map(parse_event)
| "WithTimestamps" >> beam.Map(
lambda e: beam.window.TimestampedValue(e, e["event_time"])))
# Tumbling window with late data handling
hourly_counts = (events
| "HourlyWindow" >> beam.WindowInto(
FixedWindows(3600), # 1-hour tumbling window
trigger=AfterWatermark(
early=AfterProcessingTime(60), # speculative results every 60s
late=AfterProcessingTime(600)), # late results every 10 min
accumulation_mode=AccumulationMode.ACCUMULATING,
allowed_lateness=7200) # 2 hours
| "CountPerType" >> beam.CombinePerKey(sum)
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
"project:dataset.hourly_counts",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
Exactly-Once Semantics in Streaming
The holy grail of stream processing. Achieving exactly-once is about ensuring that each event affects the output exactly one time, even in the face of failures, restarts, and network partitions.
Three Delivery Guarantees
| Guarantee | Meaning | Implementation |
|---|---|---|
| At-most-once | Events may be lost, never duplicated | Commit offset before processing |
| At-least-once | Events never lost, may be duplicated | Commit offset after processing (retry on failure) |
| Exactly-once | Events processed exactly once in output | Checkpointing + transactions + idempotency |
Flink Checkpointing
// Flink achieves exactly-once via distributed snapshots (Chandy-Lamport algorithm)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing every 60 seconds
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);
// Checkpoint configuration
CheckpointConfig config = env.getCheckpointConfig();
config.setMinPauseBetweenCheckpoints(30_000); // min 30s between checkpoints
config.setCheckpointTimeout(600_000); // fail if checkpoint takes > 10 min
config.setMaxConcurrentCheckpoints(1); // only 1 checkpoint at a time
config.setTolerableCheckpointFailureNumber(3); // allow 3 failures before job fails
config.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // keep on cancel
// State backend: where state + checkpoints are stored
env.setStateBackend(new EmbeddedRocksDBStateBackend()); // for large state
config.setCheckpointStorage("s3://checkpoints/flink/my-job/");
// How it works:
// 1. JobManager injects "checkpoint barriers" into the data stream
// 2. Each barrier flows through the DAG like a regular event
// 3. When an operator receives barriers from ALL input channels:
// a. It snapshots its state to the checkpoint storage
// b. It forwards the barrier downstream
// 4. When ALL operators have snapshotted → checkpoint is complete
// 5. On failure: restore ALL operators to the last completed checkpoint
// and replay events from Kafka starting at the checkpointed offsets
Kafka Transactions (End-to-End Exactly-Once)
// Kafka producer transactions: atomic read-process-write
Properties props = new Properties();
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-producer");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // required for transactions
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
// Process the record
String result = transform(record.value());
// Write output atomically
producer.send(new ProducerRecord<>("output-topic", record.key(), result));
}
// Commit consumer offsets AS PART OF the transaction
// This is the key: offset commit + output writes are atomic
producer.sendOffsetsToTransaction(
offsetsToCommit(records),
consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
// On restart: consumer reads from last committed offset
// → no duplicates, no data loss
}
}
// End-to-end exactly-once requires:
// 1. Source supports replay (Kafka: seek to offset)
// 2. Processing is deterministic (or state is checkpointed)
// 3. Sink supports transactions (Kafka, idempotent DB writes)
// 4. Atomic commit of offsets + state + output
Data Quality & Schema Evolution
Schema Registry & Evolution
As producers and consumers evolve independently, schemas change. A schema registry (Confluent Schema Registry, AWS Glue) enforces compatibility rules.
# Avro schema evolution example
# v1: Original schema
{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}
# v2: BACKWARD COMPATIBLE — added field with default
# Old consumers can still read new data (they ignore the new field)
{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "region", "type": "string", "default": "unknown"} ← NEW
]
}
# v3: FORWARD COMPATIBLE — removed optional field
# New consumers can still read old data (missing field gets default)
# Compatibility modes:
# BACKWARD — new schema can read old data (safe for consumers to upgrade first)
# FORWARD — old schema can read new data (safe for producers to upgrade first)
# FULL — both backward AND forward compatible
# NONE — no compatibility checks (dangerous!)
Data Quality Checks in Pipelines
# Great Expectations: data quality in Spark pipelines
import great_expectations as gx
context = gx.get_context()
# Define expectations for an orders dataset
validator = context.sources.pandas_default.read_dataframe(orders_df)
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=1_000_000)
validator.expect_column_values_to_be_in_set("status", ["PENDING", "COMPLETED", "CANCELLED"])
validator.expect_column_pair_values_a_to_be_greater_than_b("shipped_at", "created_at")
validator.expect_column_values_to_match_regex("email", r"^[^@]+@[^@]+\.[^@]+$")
# Freshness check: no data older than 24 hours
validator.expect_column_max_to_be_between(
"created_at",
min_value=(datetime.now() - timedelta(hours=24)).isoformat())
results = validator.validate()
if not results.success:
alert_on_call_engineer(results) # PagerDuty / Slack alert
quarantine_bad_records(results) # move to dead-letter table
Backfill Strategies
Backfilling — reprocessing historical data through a new or updated pipeline — is one of the hardest operational challenges in data engineering.
| Strategy | How It Works | Pros | Cons |
|---|---|---|---|
| Full replay | Re-read entire Kafka log or data lake | Simple, correct | Slow, expensive, Kafka retention limits |
| Dual-write | Run old + new pipeline simultaneously | Safe, can compare outputs | 2× resource cost |
| Partition-based | Reprocess one date partition at a time | Incremental, resumable | Complex scheduling |
| Blue-green | Write to new table, swap when done | Zero downtime | Needs 2× storage temporarily |
# Airflow: Partition-based backfill with idempotent tasks
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def process_partition(ds, **kwargs):
"""Process a single date partition. Idempotent: overwrites output."""
spark = get_spark_session()
# Read only this partition
events = spark.read.parquet(f"s3://data-lake/events/date={ds}/")
# Transform
result = transform_events(events)
# Write with overwrite — makes this idempotent
result.write.mode("overwrite").parquet(
f"s3://warehouse/transformed/date={ds}/")
dag = DAG(
"backfill_pipeline",
start_date=datetime(2024, 1, 1),
end_date=datetime(2024, 12, 31),
schedule_interval="@daily",
max_active_runs=8, # parallelize: 8 days at once
catchup=True, # process all historical dates
)
process = PythonOperator(
task_id="process_partition",
python_callable=process_partition,
dag=dag,
)
# Run backfill:
# airflow dags backfill backfill_pipeline \
# --start-date 2024-01-01 --end-date 2024-12-31 \
# --reset-dagruns
Real-World Pipeline Architecture
A production-grade data platform at scale typically combines multiple patterns:
# Production Data Platform Architecture (e-commerce scale)
#
# ┌─────────────────────────────────────────────────────────────────────────┐
# │ DATA SOURCES │
# │ Web App │ Mobile App │ Payment Service │ Inventory │ Third-party APIs │
# └─────┬─────────┬──────────────┬────────────┬─────────────┬──────────────┘
# │ │ │ │ │
# ▼ ▼ ▼ ▼ ▼
# ┌─────────────────────────────────────────────────────────────────────────┐
# │ INGESTION LAYER │
# │ Kafka (event streams) │ Debezium CDC │ Fivetran (SaaS connectors) │
# └──────────────┬────────────────┬──────────────────┬─────────────────────┘
# │ │ │
# ┌────────┴───────┐ │ │
# ▼ ▼ ▼ ▼
# ┌───────────┐ ┌────────────────────────────────────────────────────────┐
# │ SPEED │ │ DATA LAKE (S3 / GCS / ADLS) │
# │ LAYER │ │ Raw Zone │ Staging Zone │ Curated Zone │
# │ Flink │ │ (Parquet) │ (cleaned) │ (analytics-ready) │
# │ (real- │ └──────┬─────────────┬────────────────┬──────────────────┘
# │ time) │ │ │ │
# └─────┬─────┘ ▼ ▼ ▼
# │ ┌─────────────────────────────────────────────────────────┐
# │ │ BATCH PROCESSING (Spark + Airflow + dbt) │
# │ │ Data quality checks (Great Expectations) │
# │ │ Feature engineering for ML │
# │ │ Aggregations, joins, deduplication │
# │ └──────────────────────────┬──────────────────────────────┘
# │ │
# ▼ ▼
# ┌─────────────────────────────────────────────────────────────────────────┐
# │ SERVING LAYER │
# │ Snowflake/BigQuery │ Elasticsearch │ Redis │ Druid │ Pinot │
# │ (analytics) │ (search) │ (cache)│ (OLAP)│ (real-time) │
# └──────────────────────────────────┬──────────────────────────────────────┘
# │
# ▼
# ┌─────────────────────────────────────────────────────────────────────────┐
# │ CONSUMPTION LAYER │
# │ Dashboards (Looker) │ ML Models │ APIs │ Alerts │ Reports │
# └─────────────────────────────────────────────────────────────────────────┘
Key Takeaways
- Batch vs stream is not either/or — most platforms use both. Choose based on latency requirements and data volume.
- ELT has won for analytics. Modern warehouses are powerful enough to transform in-place, preserving raw data for flexibility.
- Lambda architecture provides correctness + low latency, but at the cost of maintaining two codebases. Consider it when you can't trust your streaming layer alone.
- Kappa architecture simplifies operations to a single stream path. Viable when Kafka retention is long enough and your streaming engine is mature.
- Flink is the gold standard for true streaming — event-time processing, watermarks, exactly-once, and sophisticated windowing.
- Spark dominates batch and is excellent for micro-batch streaming (Structured Streaming).
- Kafka Streams is ideal for lightweight, microservice-level stream processing without additional infrastructure.
- Windowing is fundamental: tumbling for periodic counts, sliding for moving averages, session for user behavior analysis.
- Exactly-once requires coordination between source, processor, and sink (checkpoints + transactions + idempotency).
- Schema evolution and data quality checks are not optional — they prevent silent data corruption.
- Backfill strategies must be designed upfront. Partition-based, idempotent reprocessing is the safest approach.