Design: Ad Click Aggregator
The Problem
Online advertising is a $600+ billion industry, and at its core lies a deceptively simple question: How many times was this ad clicked? The answer drives billing (advertisers pay per click), optimization (which ad creatives perform better), and fraud detection (are these clicks legitimate?).
At the scale of a major ad network — Google Ads, Meta Ads, or a large DSP — this question must be answered billions of times per day, with results available in near real-time (seconds, not hours), while ensuring every click is counted exactly once.
SELECT COUNT(*) FROM clicks WHERE ad_id = X AND ts > NOW() - INTERVAL 5 MINUTE would work for 1,000 clicks/day. At 10 billion clicks/day (~115,000 clicks/second), you need a fundamentally different architecture.
Requirements
Functional Requirements
- Count ad clicks in real-time — aggregate clicks per ad across multiple time windows (1-minute, 5-minute, 1-hour, 1-day)
- Support analytical queries — "How many clicks did ad X receive in the last 5 minutes?" with sub-second latency
- Deduplicate clicks — same user clicking the same ad within 1 minute counts as 1 click
- Support filtering dimensions — aggregate by ad_id, campaign_id, advertiser_id, country, device_type
- Historical queries — query click data for any time range in the past (up to 2 years)
- Fraud detection — flag abnormal click patterns for investigation
Non-Functional Requirements
- Scale: 10 billion clicks/day → ~115K clicks/second average, ~500K/second peak
- Latency: Real-time aggregates available within 5 seconds of the click
- Accuracy: Exactly-once counting — no missed clicks, no double-counting
- Availability: 99.99% uptime — ad billing cannot go down
- Durability: Zero data loss — every click event must be persisted
Back-of-Envelope Estimation
| Metric | Calculation | Result |
|---|---|---|
| Daily clicks | Given | 10 billion |
| Avg clicks/second | 10B / 86,400 | ~115,740 / sec |
| Peak clicks/second | ~4× average | ~500,000 / sec |
| Click event size | JSON: ~500 bytes | 500 B |
| Daily raw data | 10B × 500 B | ~5 TB / day |
| Daily ingestion bandwidth | 5 TB / 86,400 | ~58 MB / sec avg |
| Yearly raw storage | 5 TB × 365 | ~1.8 PB / year |
Data Model
Click Event Schema
Each ad click generates an event with the following structure:
{
"click_id": "c-8f14e45f-ceea-362a-929f-5c3b6e1d7a12",
"ad_id": "ad-00042837",
"campaign_id": "camp-001294",
"advertiser_id": "adv-000512",
"user_id": "u-7a3b1f9e",
"timestamp": 1714567890123,
"ip_address": "203.0.113.42",
"country": "US",
"device_type": "mobile",
"os": "iOS",
"browser": "Safari",
"referrer_url": "https://example.com/article/123",
"landing_url": "https://advertiser.com/product/456",
"is_organic": false
}
Aggregated Data Model
Pre-aggregated counts stored per time window:
┌──────────────────────────────────────────────────────────────┐
│ Aggregation Record │
├──────────────────────────────────────────────────────────────┤
│ ad_id: "ad-00042837" │
│ window_start: 2026-04-15T10:05:00Z │
│ window_size: 1_MINUTE │
│ click_count: 1,247 │
│ unique_users: 1,189 │
│ country_breakdown: { "US": 634, "UK": 213, "DE": 178, … } │
│ device_breakdown: { "mobile": 823, "desktop": 424 } │
└──────────────────────────────────────────────────────────────┘
High-Level Architecture
The system follows a streaming-first architecture with a Lambda architecture overlay for correctness guarantees:
┌─────────────────────────────────────────────────┐
│ Ad Click Flow │
└─────────────────────────────────────────────────┘
User clicks ad Ad Server logs click Kafka ingestion
─────────────── ──► ───────────────────── ──► ──────────────────
│
┌───────────────────────┼───────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Speed │ │ Batch │ │ Raw Data │
│ Layer │ │ Layer │ │ Lake │
│ (Flink) │ │ (Spark) │ │ (S3/HDFS) │
└──────┬───────┘ └──────┬───────┘ └──────────────┘
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Real-Time │ │ Batch │
│ Agg Store │ │ Agg Store │
│ (ClickHouse) │ │ (ClickHouse) │
└──────┬───────┘ └──────┬───────┘
│ │
└──────────┬───────────┘
▼
┌──────────────┐
│ Serving │
│ Layer │
│ (Query API) │
└──────┬───────┘
│
▼
┌──────────────┐
│ Dashboards │
│ & Billing │
└──────────────┘
Kafka Ingestion Layer
Apache Kafka serves as the central nervous system of the pipeline. Every click event enters the system through Kafka, providing durability, ordering guarantees (per partition), and decoupling between producers and consumers.
Topic Design
# Primary click events topic — partitioned by ad_id for ordering guarantees
Topic: ad-clicks
Partitions: 256 (supports ~2,000 events/sec per partition)
Replication: 3 (fault tolerance — survive 2 broker failures)
Retention: 7 days (reprocessing window)
Cleanup: delete (compaction not needed — append-only)
# Dead letter queue for malformed / unprocessable events
Topic: ad-clicks-dlq
Partitions: 16
Replication: 3
Retention: 30 days
# Aggregated results (for downstream consumers)
Topic: ad-click-aggregates
Partitions: 64
Replication: 3
Retention: 3 days
Partitioning Strategy
Partitioning by ad_id is critical: it ensures all clicks for a given ad land on the same partition, enabling correct per-ad counting without cross-partition coordination.
// Kafka producer configuration (Java)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // Wait for all replicas
props.put("retries", 3); // Retry on transient failures
props.put("enable.idempotence", "true"); // Exactly-once producer
props.put("max.in.flight.requests", 5); // Safe with idempotence
props.put("compression.type", "lz4"); // ~70% compression ratio
props.put("linger.ms", 5); // Micro-batch for throughput
props.put("batch.size", 65536); // 64KB batches
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Partition by ad_id — all clicks for same ad go to same partition
ProducerRecord<String, String> record = new ProducerRecord<>(
"ad-clicks",
clickEvent.getAdId(), // key = ad_id → deterministic partition
objectMapper.writeValueAsString(clickEvent)
);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// Route to DLQ after exhausting retries
deadLetterProducer.send(new ProducerRecord<>("ad-clicks-dlq",
clickEvent.getAdId(), objectMapper.writeValueAsString(clickEvent)));
metrics.counter("click.produce.failed").increment();
} else {
metrics.counter("click.produce.success").increment();
}
});
Real-Time Aggregation Pipeline
The speed layer uses Apache Flink for stateful stream processing with exactly-once semantics. Click events flow through a pipeline of operators: deduplication → windowed aggregation → sink to aggregation store.
Interactive: Real-Time Aggregation Pipeline
Step through the full pipeline — from ad click event to aggregated count in the query service:
Flink Stream Processing
Flink Job — Windowed Click Aggregation
The core Flink job implements deduplication, windowed counting, and exactly-once delivery to the sink:
// Flink Streaming Job: Ad Click Aggregator
public class AdClickAggregatorJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ── Exactly-once checkpointing ──────────────────────────────────
env.enableCheckpointing(60_000); // Checkpoint every 60 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);
env.getCheckpointConfig().setCheckpointTimeout(120_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// State backend: RocksDB for large state (dedup + window state)
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("s3://flink-checkpoints/ad-clicks/");
// ── Kafka Source ────────────────────────────────────────────────
KafkaSource<ClickEvent> source = KafkaSource.<ClickEvent>builder()
.setBootstrapServers("kafka-1:9092,kafka-2:9092,kafka-3:9092")
.setTopics("ad-clicks")
.setGroupId("flink-ad-click-aggregator")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setDeserializer(new ClickEventDeserializationSchema())
.build();
DataStream<ClickEvent> clicks = env.fromSource(
source,
WatermarkStrategy
.<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, ts) -> event.getTimestamp())
.withIdleness(Duration.ofMinutes(1)),
"Kafka-Click-Source"
);
// ── Step 1: Deduplication ───────────────────────────────────────
// Same user + same ad within 60 seconds = 1 click
DataStream<ClickEvent> deduplicated = clicks
.keyBy(e -> e.getUserId() + ":" + e.getAdId())
.process(new ClickDeduplicationFunction())
.name("Click-Deduplication");
// ── Step 2: 1-Minute Tumbling Window Aggregation ────────────────
DataStream<ClickAggregate> oneMinAggs = deduplicated
.keyBy(ClickEvent::getAdId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(30))
.sideOutputLateData(lateOutputTag)
.aggregate(new ClickCountAggregator(), new ClickWindowFunction())
.name("1-Min-Window-Aggregation");
// ── Step 3: 5-Minute Tumbling Window Aggregation ────────────────
DataStream<ClickAggregate> fiveMinAggs = deduplicated
.keyBy(ClickEvent::getAdId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateOutputTag)
.aggregate(new ClickCountAggregator(), new ClickWindowFunction())
.name("5-Min-Window-Aggregation");
// ── Step 4: 1-Hour Tumbling Window Aggregation ──────────────────
DataStream<ClickAggregate> hourlyAggs = deduplicated
.keyBy(ClickEvent::getAdId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.minutes(5))
.sideOutputLateData(lateOutputTag)
.aggregate(new ClickCountAggregator(), new ClickWindowFunction())
.name("1-Hour-Window-Aggregation");
// ── Step 5: Sink to ClickHouse ──────────────────────────────────
oneMinAggs.union(fiveMinAggs, hourlyAggs)
.addSink(new ClickHouseAggSink("jdbc:clickhouse://clickhouse:8123/ad_analytics"))
.name("ClickHouse-Sink");
// ── Late data handling ──────────────────────────────────────────
oneMinAggs.getSideOutput(lateOutputTag)
.addSink(new KafkaSink<>("ad-clicks-late"))
.name("Late-Data-Sink");
env.execute("Ad Click Aggregator Pipeline");
}
}
Deduplication Function
The deduplication operator uses Flink's keyed state with a TTL to track recently seen clicks. It keys by (user_id, ad_id) and drops duplicate clicks within a 60-second window:
public class ClickDeduplicationFunction
extends KeyedProcessFunction<String, ClickEvent, ClickEvent> {
// State: timestamp of last seen click for this (user, ad) pair
private ValueState<Long> lastSeenState;
@Override
public void open(Configuration params) {
ValueStateDescriptor<Long> desc = new ValueStateDescriptor<>(
"last-seen-ts", Types.LONG);
// Auto-expire state after 90 seconds (slightly > dedup window)
StateTtlConfig ttl = StateTtlConfig.newBuilder(Time.seconds(90))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(
StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(1000)
.build();
desc.enableTimeToLive(ttl);
lastSeenState = getRuntimeContext().getState(desc);
}
@Override
public void processElement(ClickEvent click, Context ctx,
Collector<ClickEvent> out) throws Exception {
Long lastSeen = lastSeenState.value();
long now = click.getTimestamp();
if (lastSeen == null || (now - lastSeen) > 60_000) {
// New click or outside dedup window → emit
lastSeenState.update(now);
out.collect(click);
ctx.output(metricsTag, new Metric("click.unique", 1));
} else {
// Duplicate within 60s → suppress
ctx.output(metricsTag, new Metric("click.duplicate", 1));
}
}
}
Aggregation Function
public class ClickCountAggregator
implements AggregateFunction<ClickEvent, ClickAccumulator, ClickAccumulator> {
@Override
public ClickAccumulator createAccumulator() {
return new ClickAccumulator();
}
@Override
public ClickAccumulator add(ClickEvent click, ClickAccumulator acc) {
acc.totalClicks++;
acc.uniqueUsers.add(click.getUserId());
acc.countryBreakdown.merge(click.getCountry(), 1L, Long::sum);
acc.deviceBreakdown.merge(click.getDeviceType(), 1L, Long::sum);
return acc;
}
@Override
public ClickAccumulator getResult(ClickAccumulator acc) {
return acc;
}
@Override
public ClickAccumulator merge(ClickAccumulator a, ClickAccumulator b) {
a.totalClicks += b.totalClicks;
a.uniqueUsers.addAll(b.uniqueUsers);
b.countryBreakdown.forEach((k, v) ->
a.countryBreakdown.merge(k, v, Long::sum));
b.deviceBreakdown.forEach((k, v) ->
a.deviceBreakdown.merge(k, v, Long::sum));
return a;
}
}
// Window function adds window metadata to the aggregate
public class ClickWindowFunction
extends ProcessWindowFunction<ClickAccumulator, ClickAggregate, String, TimeWindow> {
@Override
public void process(String adId, Context ctx,
Iterable<ClickAccumulator> accs,
Collector<ClickAggregate> out) {
ClickAccumulator acc = accs.iterator().next();
TimeWindow window = ctx.window();
out.collect(new ClickAggregate(
adId,
Instant.ofEpochMilli(window.getStart()),
Instant.ofEpochMilli(window.getEnd()),
windowSizeName(window),
acc.totalClicks,
acc.uniqueUsers.size(),
acc.countryBreakdown,
acc.deviceBreakdown
));
}
}
Click Deduplication Strategies
Deduplication is critical for accurate billing. A user accidentally double-clicking an ad should not charge the advertiser twice. There are multiple approaches, each with distinct trade-offs:
Strategy Comparison
| Strategy | Space | Accuracy | Latency | Use Case |
|---|---|---|---|---|
| Flink Keyed State | ~20 bytes/key, RocksDB | Exact (within TTL) | Microseconds | Primary dedup in stream |
| Redis SET with TTL | ~100 bytes/key | Exact | ~1 ms (network) | Cross-service dedup |
| Bloom Filter | ~10 bits/element | ~1% false positives | Microseconds | Pre-filter before exact check |
| Counting Bloom Filter | ~40 bits/element | ~1% FP, supports delete | Microseconds | Time-window rotation |
Redis-Based Deduplication
For cross-service deduplication (e.g., the ad server itself deduplicating before sending to Kafka):
# Redis deduplication with 60-second TTL
# Key format: dedup:{user_id}:{ad_id}
# Value: 1 (existence check only)
def is_duplicate_click(redis_client, user_id, ad_id, ttl_seconds=60):
"""
Returns True if this click is a duplicate (should be suppressed).
Uses SET NX (set-if-not-exists) for atomic check-and-set.
"""
key = f"dedup:{user_id}:{ad_id}"
# SET key 1 NX EX 60
# NX = only set if not exists
# EX = expire after 60 seconds
was_set = redis_client.set(key, 1, nx=True, ex=ttl_seconds)
if was_set:
# Key didn't exist → first click → NOT a duplicate
return False
else:
# Key already existed → duplicate within TTL window
return True
# Memory estimate for Redis dedup:
# 115K clicks/sec × 60s TTL = ~6.9M active keys
# ~100 bytes per key (key + overhead) = ~690 MB
# With Redis Cluster (3 shards) = ~230 MB per shard
Bloom Filter Pre-Filtering
For extremely high throughput, use a two-layer dedup: Bloom filter as a fast pre-filter, then exact check only for potential duplicates:
public class TwoLayerDeduplication {
// Rotating Bloom filters — current minute + previous minute
private BloomFilter<String> currentMinuteFilter;
private BloomFilter<String> previousMinuteFilter;
private long currentMinuteTs;
// Expected elements: 115K clicks/sec × 60s = ~7M per minute
// Target FPR: 0.1% → ~10 bits/element → ~8.75 MB per filter
private static final int EXPECTED_INSERTIONS = 7_000_000;
private static final double FPP = 0.001;
public TwoLayerDeduplication() {
this.currentMinuteFilter = BloomFilter.create(
Funnels.stringFunnel(StandardCharsets.UTF_8),
EXPECTED_INSERTIONS, FPP);
this.previousMinuteFilter = BloomFilter.create(
Funnels.stringFunnel(StandardCharsets.UTF_8),
EXPECTED_INSERTIONS, FPP);
this.currentMinuteTs = currentMinute();
}
public boolean isDuplicate(String userId, String adId) {
rotateIfNeeded();
String key = userId + ":" + adId;
// Fast path: if NOT in either Bloom filter → definitely unique
if (!currentMinuteFilter.mightContain(key) &&
!previousMinuteFilter.mightContain(key)) {
currentMinuteFilter.put(key);
return false;
}
// Slow path: Bloom filter says "maybe" → check exact store
// (Redis or Flink state)
return checkExactStore(key);
}
private void rotateIfNeeded() {
long now = currentMinute();
if (now > currentMinuteTs) {
previousMinuteFilter = currentMinuteFilter;
currentMinuteFilter = BloomFilter.create(
Funnels.stringFunnel(StandardCharsets.UTF_8),
EXPECTED_INSERTIONS, FPP);
currentMinuteTs = now;
}
}
}
Aggregation Store — ClickHouse
ClickHouse is an ideal aggregation store for this use case: it's a columnar OLAP database designed for real-time analytical queries over billions of rows, with sub-second latency for aggregation queries.
ClickHouse Schema
-- Raw click events (for batch reprocessing and auditing)
CREATE TABLE ad_clicks_raw ON CLUSTER '{cluster}'
(
click_id String,
ad_id LowCardinality(String),
campaign_id LowCardinality(String),
advertiser_id LowCardinality(String),
user_id String,
timestamp DateTime64(3, 'UTC'),
ip_address IPv4,
country LowCardinality(FixedString(2)),
device_type Enum8('desktop' = 1, 'mobile' = 2, 'tablet' = 3),
os LowCardinality(String),
browser LowCardinality(String),
referrer_url String,
landing_url String,
is_duplicate UInt8 DEFAULT 0,
-- Partition and sort keys for efficient scans
event_date Date DEFAULT toDate(timestamp)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/ad_clicks_raw',
'{replica}')
PARTITION BY toYYYYMMDD(event_date)
ORDER BY (ad_id, timestamp, click_id)
TTL event_date + INTERVAL 90 DAY
SETTINGS index_granularity = 8192;
-- Pre-aggregated 1-minute click counts
CREATE TABLE ad_click_counts_1m ON CLUSTER '{cluster}'
(
ad_id LowCardinality(String),
campaign_id LowCardinality(String),
advertiser_id LowCardinality(String),
window_start DateTime('UTC'),
window_end DateTime('UTC'),
click_count UInt64,
unique_users UInt64,
country LowCardinality(FixedString(2)),
device_type Enum8('desktop' = 1, 'mobile' = 2, 'tablet' = 3),
event_date Date DEFAULT toDate(window_start)
)
ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/ad_click_counts_1m',
'{replica}',
(click_count, unique_users))
PARTITION BY toYYYYMM(event_date)
ORDER BY (ad_id, country, device_type, window_start)
TTL event_date + INTERVAL 2 YEAR
SETTINGS index_granularity = 8192;
-- Materialized view: auto-aggregate raw → 1-minute
CREATE MATERIALIZED VIEW ad_click_counts_1m_mv ON CLUSTER '{cluster}'
TO ad_click_counts_1m
AS
SELECT
ad_id,
campaign_id,
advertiser_id,
toStartOfMinute(timestamp) AS window_start,
toStartOfMinute(timestamp) + INTERVAL 1 MINUTE AS window_end,
count() AS click_count,
uniqExact(user_id) AS unique_users,
country,
device_type
FROM ad_clicks_raw
WHERE is_duplicate = 0
GROUP BY ad_id, campaign_id, advertiser_id, window_start,
window_end, country, device_type;
-- 1-hour rollup from 1-minute aggregates
CREATE TABLE ad_click_counts_1h ON CLUSTER '{cluster}'
(
ad_id LowCardinality(String),
campaign_id LowCardinality(String),
advertiser_id LowCardinality(String),
window_start DateTime('UTC'),
window_end DateTime('UTC'),
click_count UInt64,
unique_users UInt64,
country LowCardinality(FixedString(2)),
device_type Enum8('desktop' = 1, 'mobile' = 2, 'tablet' = 3),
event_date Date DEFAULT toDate(window_start)
)
ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/ad_click_counts_1h',
'{replica}',
(click_count, unique_users))
PARTITION BY toYYYYMM(event_date)
ORDER BY (ad_id, country, device_type, window_start)
TTL event_date + INTERVAL 2 YEAR
SETTINGS index_granularity = 8192;
Query Examples
-- Query 1: Clicks for ad X in the last 5 minutes (real-time)
SELECT
sum(click_count) AS total_clicks,
sum(unique_users) AS total_unique
FROM ad_click_counts_1m
WHERE ad_id = 'ad-00042837'
AND window_start >= now() - INTERVAL 5 MINUTE;
-- Response time: ~5ms (scans only 5 rows per shard)
-- Query 2: Hourly trend for campaign over 7 days
SELECT
toStartOfHour(window_start) AS hour,
sum(click_count) AS clicks,
sum(unique_users) AS uniques
FROM ad_click_counts_1h
WHERE campaign_id = 'camp-001294'
AND window_start >= now() - INTERVAL 7 DAY
GROUP BY hour
ORDER BY hour;
-- Response time: ~50ms (168 hourly buckets)
-- Query 3: Top ads by country breakdown
SELECT
ad_id,
country,
sum(click_count) AS clicks
FROM ad_click_counts_1h
WHERE advertiser_id = 'adv-000512'
AND window_start >= now() - INTERVAL 24 HOUR
GROUP BY ad_id, country
ORDER BY clicks DESC
LIMIT 100;
-- Response time: ~100ms
Lambda Architecture
Real-time stream processing is fast but can drift from ground truth due to late-arriving events, reprocessing after failures, or edge cases in deduplication. The Lambda architecture addresses this by running two parallel computation paths:
- Speed Layer (Flink) — provides low-latency approximate results within seconds. Handles dedup and windowed aggregation in real-time. May slightly over- or under-count due to late arrivals.
- Batch Layer (Spark) — reprocesses raw click data periodically (every 1–6 hours). Produces exact, reconciled counts by processing the complete dataset with full deduplication.
- Serving Layer (Query API) — merges results from both layers. For recent data (< 2 hours), serve speed layer results. For older data, serve batch layer results. During overlap, prefer batch results.
Interactive: Lambda Architecture
Watch how real-time and batch layers work together — and how the batch layer corrects speed layer drift over time:
Batch Layer — Spark Aggregation
# Spark batch aggregation job (runs every hour)
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder \
.appName("AdClickBatchAggregator") \
.config("spark.sql.shuffle.partitions", 256) \
.config("spark.sql.adaptive.enabled", True) \
.getOrCreate()
# Read raw click events from data lake (S3/HDFS)
raw_clicks = spark.read.parquet(
"s3://ad-data-lake/clicks/",
mergeSchema=True
).filter(
F.col("event_date") >= F.date_sub(F.current_date(), 2)
)
# Step 1: Exact deduplication using window functions
# Keep only the FIRST click per (user_id, ad_id) within 60-second windows
dedup_window = Window.partitionBy("user_id", "ad_id") \
.orderBy("timestamp")
deduped = raw_clicks \
.withColumn("prev_ts",
F.lag("timestamp").over(dedup_window)) \
.filter(
F.col("prev_ts").isNull() | # first click ever
(F.unix_timestamp("timestamp") -
F.unix_timestamp("prev_ts") > 60) # > 60s gap
) \
.drop("prev_ts")
# Step 2: Aggregate into 1-minute windows
minute_aggs = deduped \
.withColumn("window_start",
F.date_trunc("minute", "timestamp")) \
.groupBy("ad_id", "campaign_id", "advertiser_id",
"window_start", "country", "device_type") \
.agg(
F.count("*").alias("click_count"),
F.countDistinct("user_id").alias("unique_users")
) \
.withColumn("window_end",
F.col("window_start") + F.expr("INTERVAL 1 MINUTE")) \
.withColumn("source", F.lit("batch"))
# Step 3: Write to batch aggregation store
minute_aggs.write \
.format("jdbc") \
.option("url", "jdbc:clickhouse://clickhouse:8123/ad_analytics") \
.option("dbtable", "ad_click_counts_1m_batch") \
.option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
.mode("overwrite") \
.save()
print(f"Batch aggregation complete: {minute_aggs.count()} records")
Data Reconciliation
The reconciliation service compares speed layer and batch layer results, flagging and correcting discrepancies:
-- Reconciliation query: compare real-time vs batch counts
-- Run after each batch job completes
SELECT
rt.ad_id,
rt.window_start,
rt.click_count AS realtime_count,
bt.click_count AS batch_count,
abs(rt.click_count - bt.click_count) AS discrepancy,
round(abs(rt.click_count - bt.click_count) * 100.0
/ bt.click_count, 2) AS discrepancy_pct
FROM ad_click_counts_1m AS rt
JOIN ad_click_counts_1m_batch AS bt
ON rt.ad_id = bt.ad_id
AND rt.window_start = bt.window_start
AND rt.country = bt.country
AND rt.device_type = bt.device_type
WHERE bt.window_start >= now() - INTERVAL 6 HOUR
AND abs(rt.click_count - bt.click_count) > 0
ORDER BY discrepancy_pct DESC
LIMIT 100;
-- Auto-correct: replace real-time counts with batch counts
-- for windows where batch data is available
INSERT INTO ad_click_counts_1m
SELECT
bt.ad_id, bt.campaign_id, bt.advertiser_id,
bt.window_start, bt.window_end,
bt.click_count, bt.unique_users,
bt.country, bt.device_type, toDate(bt.window_start)
FROM ad_click_counts_1m_batch AS bt
WHERE bt.window_start >= now() - INTERVAL 6 HOUR;
-- SummingMergeTree will merge the old + new rows on next compaction
-- Use ReplacingMergeTree if you need exact replacement semantics
Exactly-Once Counting
Achieving exactly-once semantics end-to-end requires coordination across three layers:
End-to-End Exactly-Once Pipeline
┌─────────────────────────────────────────────────────────────────────┐
│ Exactly-Once Guarantee Chain │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Producer Kafka Flink ClickHouse │
│ ───────── ───── ───── ────────── │
│ idempotent → exactly-once → checkpointed → idempotent │
│ producer replication state + offsets upserts │
│ │
│ acks=all min.isr=2 checkpoint ReplacingMerge │
│ retries=3 unclean=false interval=60s Tree or │
│ idempotence log repl=3 RocksDB state deduplicate │
│ =true backend on INSERT │
│ │
└─────────────────────────────────────────────────────────────────────┘
Flink checkpointing is the linchpin. During a checkpoint, Flink atomically snapshots:
- Kafka consumer offsets — exactly which messages have been processed
- Operator state — window accumulator contents, dedup state
- Sink pre-commit — buffered writes to ClickHouse
On failure recovery, Flink restores from the last successful checkpoint: it resets Kafka offsets, restores operator state, and replays from the checkpoint position. Any writes to ClickHouse are idempotent (using ReplacingMergeTree or dedup on INSERT), so replayed outputs don't cause double-counting.
// Flink ClickHouse sink with idempotent writes
public class ClickHouseAggSink extends RichSinkFunction<ClickAggregate>
implements CheckpointedFunction {
private ListState<ClickAggregate> bufferedState;
private List<ClickAggregate> buffer = new ArrayList<>();
@Override
public void invoke(ClickAggregate agg, Context ctx) {
buffer.add(agg);
if (buffer.size() >= 1000) {
flushBuffer();
}
}
@Override
public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
// Flush to ClickHouse before checkpoint completes
flushBuffer();
bufferedState.clear();
bufferedState.addAll(buffer);
}
@Override
public void initializeState(FunctionInitializationContext ctx) throws Exception {
bufferedState = ctx.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("ch-buffer",
ClickAggregate.class));
if (ctx.isRestored()) {
// Re-flush any buffered records from pre-failure checkpoint
for (ClickAggregate agg : bufferedState.get()) {
buffer.add(agg);
}
flushBuffer(); // Idempotent — ClickHouse deduplicates
}
}
private void flushBuffer() {
if (buffer.isEmpty()) return;
// INSERT with deduplication —
// ClickHouse ReplicatedMergeTree deduplicates by (ad_id, window_start)
String sql = "INSERT INTO ad_click_counts_1m " +
"(ad_id, campaign_id, advertiser_id, window_start, " +
" window_end, click_count, unique_users, country, device_type) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
for (ClickAggregate agg : buffer) {
ps.setString(1, agg.adId);
ps.setString(2, agg.campaignId);
ps.setString(3, agg.advertiserId);
ps.setTimestamp(4, Timestamp.from(agg.windowStart));
ps.setTimestamp(5, Timestamp.from(agg.windowEnd));
ps.setLong(6, agg.clickCount);
ps.setLong(7, agg.uniqueUsers);
ps.setString(8, agg.country);
ps.setString(9, agg.deviceType);
ps.addBatch();
}
ps.executeBatch();
buffer.clear();
}
}
}
Query Service
The query service is a thin API layer that routes queries to the appropriate store based on the time range and freshness requirements:
@RestController
@RequestMapping("/api/v1/clicks")
public class ClickQueryController {
private final ClickHouseTemplate clickHouse;
private final CacheManager cache;
// GET /api/v1/clicks/ad/{adId}?window=5m
@GetMapping("/ad/{adId}")
public ClickResponse getAdClicks(
@PathVariable String adId,
@RequestParam(defaultValue = "5m") String window,
@RequestParam(required = false) String country,
@RequestParam(required = false) String device) {
Duration dur = parseDuration(window);
String cacheKey = String.format("clicks:%s:%s:%s:%s",
adId, window, country, device);
// Cache short windows briefly (10s), longer windows more (60s)
Duration cacheTtl = dur.toMinutes() <= 5
? Duration.ofSeconds(10)
: Duration.ofSeconds(60);
return cache.get(cacheKey, cacheTtl, () -> {
// Choose table based on window size
String table = chooseTable(dur);
// Build and execute ClickHouse query
return clickHouse.query(
"SELECT sum(click_count) AS clicks, " +
" sum(unique_users) AS uniques " +
"FROM " + table + " " +
"WHERE ad_id = ? " +
" AND window_start >= now() - toIntervalSecond(?) " +
buildFilters(country, device),
adId, dur.getSeconds()
);
});
}
// Route to the most efficient table
private String chooseTable(Duration window) {
if (window.toHours() >= 24) return "ad_click_counts_1h";
if (window.toMinutes() >= 10) return "ad_click_counts_1h";
return "ad_click_counts_1m";
}
}
Fraud Detection
Click fraud costs advertisers an estimated $100 billion annually. The aggregator must detect and filter fraudulent clicks before they affect billing. Fraud signals are evaluated at multiple levels:
Fraud Detection Signals
| Signal | Detection Method | Action |
|---|---|---|
| IP click flooding | > 50 clicks from same IP in 1 minute | Block IP, flag clicks |
| Click farm patterns | High click volume from narrow IP range + low conversion | Flag campaign, alert |
| Bot signatures | Missing/invalid browser fingerprint, datacenter IPs | Drop click |
| Geographic anomalies | Clicks from non-targeted regions | Exclude from billing |
| Timing patterns | Perfectly periodic clicks (e.g., exactly every 5 seconds) | Flag as bot |
| CTR anomalies | Click-through rate > 3σ above campaign average | Hold for review |
Real-Time Fraud Detection in Flink
public class ClickFraudDetector
extends KeyedProcessFunction<String, ClickEvent, FraudAlert> {
// State: sliding window of click timestamps per IP
private MapState<String, List<Long>> ipClickWindows;
// State: per-ad click velocity
private ValueState<ClickVelocity> adVelocity;
@Override
public void open(Configuration params) {
MapStateDescriptor<String, List<Long>> ipDesc =
new MapStateDescriptor<>("ip-clicks", String.class,
TypeInformation.of(new TypeHint<List<Long>>(){}));
StateTtlConfig ipTtl = StateTtlConfig
.newBuilder(Time.minutes(5))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.build();
ipDesc.enableTimeToLive(ipTtl);
ipClickWindows = getRuntimeContext().getMapState(ipDesc);
ValueStateDescriptor<ClickVelocity> velDesc =
new ValueStateDescriptor<>("ad-velocity", ClickVelocity.class);
adVelocity = getRuntimeContext().getState(velDesc);
}
@Override
public void processElement(ClickEvent click, Context ctx,
Collector<FraudAlert> out) throws Exception {
String ip = click.getIpAddress();
long now = click.getTimestamp();
// ── Check 1: IP flooding ────────────────────────────────
List<Long> ipClicks = ipClickWindows.get(ip);
if (ipClicks == null) ipClicks = new ArrayList<>();
// Remove clicks older than 60 seconds
ipClicks.removeIf(ts -> (now - ts) > 60_000);
ipClicks.add(now);
ipClickWindows.put(ip, ipClicks);
if (ipClicks.size() > 50) {
out.collect(new FraudAlert(
FraudType.IP_FLOODING, click,
"IP " + ip + " made " + ipClicks.size() +
" clicks in 60s"));
return; // Don't count this click
}
// ── Check 2: Timing pattern (periodic clicks) ──────────
if (ipClicks.size() >= 5) {
List<Long> intervals = new ArrayList<>();
for (int i = 1; i < ipClicks.size(); i++) {
intervals.add(ipClicks.get(i) - ipClicks.get(i - 1));
}
double stdDev = calculateStdDev(intervals);
if (stdDev < 100) { // Less than 100ms variation = bot
out.collect(new FraudAlert(
FraudType.BOT_PATTERN, click,
"Periodic click pattern (σ=" + stdDev + "ms)"));
return;
}
}
// ── Check 3: Click velocity anomaly ─────────────────────
ClickVelocity vel = adVelocity.value();
if (vel == null) vel = new ClickVelocity();
vel.addClick(now);
adVelocity.update(vel);
if (vel.getCurrentRate() > vel.getMovingAverage() * 3) {
out.collect(new FraudAlert(
FraudType.VELOCITY_ANOMALY, click,
"Click rate " + vel.getCurrentRate() +
"/min vs avg " + vel.getMovingAverage() + "/min"));
}
}
}
IP-Based Filtering
# IP reputation database: maintain a set of known-bad IPs
# Updated from threat intelligence feeds + own detection
class IPReputationFilter:
def __init__(self, redis_client):
self.redis = redis_client
# Bloom filter for datacenter IP ranges (fast pre-filter)
self.datacenter_ips = load_datacenter_ip_bloom_filter()
def is_suspicious(self, ip_address: str) -> tuple[bool, str]:
# Check 1: Known bad IP (blacklisted)
if self.redis.sismember("ip:blacklist", ip_address):
return True, "blacklisted"
# Check 2: Datacenter / VPN / Proxy IP
if self.datacenter_ips.might_contain(ip_address):
return True, "datacenter_ip"
# Check 3: Rate limit per IP
key = f"ip:rate:{ip_address}"
count = self.redis.incr(key)
if count == 1:
self.redis.expire(key, 60) # 60-second window
if count > 100:
# Auto-blacklist for 1 hour
self.redis.sadd("ip:blacklist", ip_address)
self.redis.expire(f"ip:blacklist:{ip_address}", 3600)
return True, "rate_limited"
return False, "clean"
Scaling & Operational Concerns
Kafka Scaling
┌────────────────────────────────────────────────────────────────────┐
│ Kafka Cluster Sizing │
├────────────────────────────────────────────────────────────────────┤
│ Brokers: 15 (5 racks × 3 brokers) │
│ Partitions: 256 (ad-clicks topic) │
│ Replication: 3 (cross-rack) │
│ Throughput: ~58 MB/sec ingress, ~174 MB/sec with replication │
│ Disk: 7 days retention × 5 TB/day = 35 TB → 12 TB/broker │
│ Network: Each broker: ~12 MB/sec in, ~35 MB/sec out │
│ (3× for replication + consumer fan-out) │
└────────────────────────────────────────────────────────────────────┘
Flink Scaling
┌────────────────────────────────────────────────────────────────────┐
│ Flink Cluster Sizing │
├────────────────────────────────────────────────────────────────────┤
│ TaskManagers: 32 (8 cores, 32 GB each) │
│ Parallelism: 256 (matches Kafka partitions) │
│ Checkpoint: 60s interval, S3 backend │
│ State size: ~50 GB (dedup state + window accumulators) │
│ RocksDB: 512 MB write buffer per TM, bloom filter on │
│ Watermark delay: 10 seconds (bounded out-of-order) │
│ Recovery time: ~30 seconds from checkpoint │
└────────────────────────────────────────────────────────────────────┘
ClickHouse Cluster
┌────────────────────────────────────────────────────────────────────┐
│ ClickHouse Cluster Sizing │
├────────────────────────────────────────────────────────────────────┤
│ Shards: 8 (distributed across AZs) │
│ Replicas: 2 per shard (16 nodes total) │
│ CPU: 32 cores per node │
│ RAM: 256 GB per node (hot data in memory) │
│ Disk: 8 TB NVMe SSD per node │
│ Data: Raw clicks ~90 days (compressed ~2:1 → 225 TB total) │
│ Aggregates ~2 years (very compact) │
│ Queries: Sub-second for recent aggregates │
│ < 5s for full table scans over 24h │
└────────────────────────────────────────────────────────────────────┘
Aggregation Store Alternatives
| Store | Type | Strengths | Weaknesses |
|---|---|---|---|
| ClickHouse | Columnar OLAP | Fastest aggregation queries, great compression, SQL interface | No true upsert, eventual consistency on merges |
| Apache Druid | Real-time OLAP | Native Kafka ingestion, sub-second for recent data, built-in rollup | Complex operations, high memory usage |
| InfluxDB | Time-Series DB | Purpose-built for time-series, built-in downsampling, Flux query language | Limited JOIN support, less flexible dimensions |
| Apache Pinot | Real-time OLAP | Upsert support, low-latency, LinkedIn-scale proven | Smaller community, steeper learning curve |
| Redis + PostgreSQL | Cache + RDBMS | Simple, familiar, good for moderate scale | Won't scale to 10B clicks/day without significant engineering |
Monitoring & Alerting
# Key metrics to monitor
┌──────────────────────────────────────────────────────────────────┐
│ Metric │ Alert Threshold │
├──────────────────────────────────┼───────────────────────────────┤
│ Kafka consumer lag │ > 100K messages │
│ Flink checkpoint duration │ > 120 seconds │
│ Flink checkpoint failures │ > 3 consecutive │
│ End-to-end latency (click→agg) │ > 10 seconds (p99) │
│ Dedup ratio │ < 1% or > 20% (anomaly) │
│ ClickHouse query latency (p99) │ > 500 ms │
│ Fraud alert rate │ > 5% of total clicks │
│ Batch-vs-RT discrepancy │ > 2% per window │
│ Data lake write failures │ Any │
│ Kafka broker disk usage │ > 80% │
└──────────────────────────────────┴───────────────────────────────┘
Summary
| Component | Technology | Role |
|---|---|---|
| Ingestion | Apache Kafka (256 partitions) | Durable, ordered event stream |
| Speed Layer | Apache Flink | Real-time dedup + windowed aggregation |
| Batch Layer | Apache Spark | Hourly exact reprocessing |
| Aggregation Store | ClickHouse (8 shards × 2 replicas) | Sub-second OLAP queries |
| Deduplication | Flink state + Redis + Bloom filter | Multi-layer click dedup |
| Fraud Detection | Flink CEP + IP reputation DB | Real-time fraud filtering |
| Data Lake | S3 / HDFS (Parquet format) | Raw event archive for reprocessing |
| Query Service | Spring Boot REST API | Unified query interface with caching |