← All Posts
High Level Design Series · Real-World Designs· Post 65 of 70

Design: Ad Click Aggregator

The Problem

Online advertising is a $600+ billion industry, and at its core lies a deceptively simple question: How many times was this ad clicked? The answer drives billing (advertisers pay per click), optimization (which ad creatives perform better), and fraud detection (are these clicks legitimate?).

At the scale of a major ad network — Google Ads, Meta Ads, or a large DSP — this question must be answered billions of times per day, with results available in near real-time (seconds, not hours), while ensuring every click is counted exactly once.

Why this is hard: A simple SELECT COUNT(*) FROM clicks WHERE ad_id = X AND ts > NOW() - INTERVAL 5 MINUTE would work for 1,000 clicks/day. At 10 billion clicks/day (~115,000 clicks/second), you need a fundamentally different architecture.

Requirements

Functional Requirements

Non-Functional Requirements

Back-of-Envelope Estimation

MetricCalculationResult
Daily clicksGiven10 billion
Avg clicks/second10B / 86,400~115,740 / sec
Peak clicks/second~4× average~500,000 / sec
Click event sizeJSON: ~500 bytes500 B
Daily raw data10B × 500 B~5 TB / day
Daily ingestion bandwidth5 TB / 86,400~58 MB / sec avg
Yearly raw storage5 TB × 365~1.8 PB / year

Data Model

Click Event Schema

Each ad click generates an event with the following structure:

{
  "click_id":      "c-8f14e45f-ceea-362a-929f-5c3b6e1d7a12",
  "ad_id":         "ad-00042837",
  "campaign_id":   "camp-001294",
  "advertiser_id": "adv-000512",
  "user_id":       "u-7a3b1f9e",
  "timestamp":     1714567890123,
  "ip_address":    "203.0.113.42",
  "country":       "US",
  "device_type":   "mobile",
  "os":            "iOS",
  "browser":       "Safari",
  "referrer_url":  "https://example.com/article/123",
  "landing_url":   "https://advertiser.com/product/456",
  "is_organic":    false
}

Aggregated Data Model

Pre-aggregated counts stored per time window:

┌──────────────────────────────────────────────────────────────┐
│  Aggregation Record                                          │
├──────────────────────────────────────────────────────────────┤
│  ad_id:          "ad-00042837"                               │
│  window_start:   2026-04-15T10:05:00Z                        │
│  window_size:    1_MINUTE                                    │
│  click_count:    1,247                                       │
│  unique_users:   1,189                                       │
│  country_breakdown: { "US": 634, "UK": 213, "DE": 178, … }  │
│  device_breakdown:  { "mobile": 823, "desktop": 424 }       │
└──────────────────────────────────────────────────────────────┘

High-Level Architecture

The system follows a streaming-first architecture with a Lambda architecture overlay for correctness guarantees:

                        ┌─────────────────────────────────────────────────┐
                        │                  Ad Click Flow                   │
                        └─────────────────────────────────────────────────┘

  User clicks ad        Ad Server logs click       Kafka ingestion
  ─────────────── ──►  ───────────────────── ──►  ──────────────────
                                                         │
                                 ┌───────────────────────┼───────────────────────┐
                                 │                       │                       │
                                 ▼                       ▼                       ▼
                        ┌──────────────┐       ┌──────────────┐       ┌──────────────┐
                        │   Speed      │       │   Batch      │       │   Raw Data   │
                        │   Layer      │       │   Layer      │       │   Lake       │
                        │  (Flink)     │       │  (Spark)     │       │  (S3/HDFS)   │
                        └──────┬───────┘       └──────┬───────┘       └──────────────┘
                               │                      │
                               ▼                      ▼
                        ┌──────────────┐       ┌──────────────┐
                        │  Real-Time   │       │   Batch      │
                        │  Agg Store   │       │   Agg Store  │
                        │ (ClickHouse) │       │ (ClickHouse) │
                        └──────┬───────┘       └──────┬───────┘
                               │                      │
                               └──────────┬───────────┘
                                          ▼
                                   ┌──────────────┐
                                   │   Serving    │
                                   │   Layer      │
                                   │  (Query API) │
                                   └──────┬───────┘
                                          │
                                          ▼
                                   ┌──────────────┐
                                   │  Dashboards  │
                                   │  & Billing   │
                                   └──────────────┘

Kafka Ingestion Layer

Apache Kafka serves as the central nervous system of the pipeline. Every click event enters the system through Kafka, providing durability, ordering guarantees (per partition), and decoupling between producers and consumers.

Topic Design

# Primary click events topic — partitioned by ad_id for ordering guarantees
Topic: ad-clicks
  Partitions: 256    (supports ~2,000 events/sec per partition)
  Replication: 3     (fault tolerance — survive 2 broker failures)
  Retention: 7 days  (reprocessing window)
  Cleanup: delete    (compaction not needed — append-only)

# Dead letter queue for malformed / unprocessable events
Topic: ad-clicks-dlq
  Partitions: 16
  Replication: 3
  Retention: 30 days

# Aggregated results (for downstream consumers)
Topic: ad-click-aggregates
  Partitions: 64
  Replication: 3
  Retention: 3 days

Partitioning Strategy

Partitioning by ad_id is critical: it ensures all clicks for a given ad land on the same partition, enabling correct per-ad counting without cross-partition coordination.

// Kafka producer configuration (Java)
Properties props = new Properties();
props.put("bootstrap.servers",       "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put("key.serializer",          "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",        "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks",                    "all");           // Wait for all replicas
props.put("retries",                 3);               // Retry on transient failures
props.put("enable.idempotence",      "true");          // Exactly-once producer
props.put("max.in.flight.requests",  5);               // Safe with idempotence
props.put("compression.type",        "lz4");           // ~70% compression ratio
props.put("linger.ms",              5);                // Micro-batch for throughput
props.put("batch.size",             65536);            // 64KB batches

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Partition by ad_id — all clicks for same ad go to same partition
ProducerRecord<String, String> record = new ProducerRecord<>(
    "ad-clicks",
    clickEvent.getAdId(),        // key = ad_id → deterministic partition
    objectMapper.writeValueAsString(clickEvent)
);

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        // Route to DLQ after exhausting retries
        deadLetterProducer.send(new ProducerRecord<>("ad-clicks-dlq",
            clickEvent.getAdId(), objectMapper.writeValueAsString(clickEvent)));
        metrics.counter("click.produce.failed").increment();
    } else {
        metrics.counter("click.produce.success").increment();
    }
});
Why partition by ad_id? With 256 partitions and ~115K clicks/sec, each partition handles ~450 clicks/sec on average. This keeps per-partition throughput well within Kafka's capabilities (~10K messages/sec per partition). More importantly, it enables stateful stream processing — the Flink operator counting clicks for ad X sees all of ad X's clicks on one task without shuffling.

Real-Time Aggregation Pipeline

The speed layer uses Apache Flink for stateful stream processing with exactly-once semantics. Click events flow through a pipeline of operators: deduplication → windowed aggregation → sink to aggregation store.

Interactive: Real-Time Aggregation Pipeline

Step through the full pipeline — from ad click event to aggregated count in the query service:

The core Flink job implements deduplication, windowed counting, and exactly-once delivery to the sink:

// Flink Streaming Job: Ad Click Aggregator
public class AdClickAggregatorJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // ── Exactly-once checkpointing ──────────────────────────────────
        env.enableCheckpointing(60_000);  // Checkpoint every 60 seconds
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);
        env.getCheckpointConfig().setCheckpointTimeout(120_000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(
            ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // State backend: RocksDB for large state (dedup + window state)
        env.setStateBackend(new EmbeddedRocksDBStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("s3://flink-checkpoints/ad-clicks/");

        // ── Kafka Source ────────────────────────────────────────────────
        KafkaSource<ClickEvent> source = KafkaSource.<ClickEvent>builder()
            .setBootstrapServers("kafka-1:9092,kafka-2:9092,kafka-3:9092")
            .setTopics("ad-clicks")
            .setGroupId("flink-ad-click-aggregator")
            .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
            .setDeserializer(new ClickEventDeserializationSchema())
            .build();

        DataStream<ClickEvent> clicks = env.fromSource(
            source,
            WatermarkStrategy
                .<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                .withTimestampAssigner((event, ts) -> event.getTimestamp())
                .withIdleness(Duration.ofMinutes(1)),
            "Kafka-Click-Source"
        );

        // ── Step 1: Deduplication ───────────────────────────────────────
        // Same user + same ad within 60 seconds = 1 click
        DataStream<ClickEvent> deduplicated = clicks
            .keyBy(e -> e.getUserId() + ":" + e.getAdId())
            .process(new ClickDeduplicationFunction())
            .name("Click-Deduplication");

        // ── Step 2: 1-Minute Tumbling Window Aggregation ────────────────
        DataStream<ClickAggregate> oneMinAggs = deduplicated
            .keyBy(ClickEvent::getAdId)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .allowedLateness(Time.seconds(30))
            .sideOutputLateData(lateOutputTag)
            .aggregate(new ClickCountAggregator(), new ClickWindowFunction())
            .name("1-Min-Window-Aggregation");

        // ── Step 3: 5-Minute Tumbling Window Aggregation ────────────────
        DataStream<ClickAggregate> fiveMinAggs = deduplicated
            .keyBy(ClickEvent::getAdId)
            .window(TumblingEventTimeWindows.of(Time.minutes(5)))
            .allowedLateness(Time.minutes(1))
            .sideOutputLateData(lateOutputTag)
            .aggregate(new ClickCountAggregator(), new ClickWindowFunction())
            .name("5-Min-Window-Aggregation");

        // ── Step 4: 1-Hour Tumbling Window Aggregation ──────────────────
        DataStream<ClickAggregate> hourlyAggs = deduplicated
            .keyBy(ClickEvent::getAdId)
            .window(TumblingEventTimeWindows.of(Time.hours(1)))
            .allowedLateness(Time.minutes(5))
            .sideOutputLateData(lateOutputTag)
            .aggregate(new ClickCountAggregator(), new ClickWindowFunction())
            .name("1-Hour-Window-Aggregation");

        // ── Step 5: Sink to ClickHouse ──────────────────────────────────
        oneMinAggs.union(fiveMinAggs, hourlyAggs)
            .addSink(new ClickHouseAggSink("jdbc:clickhouse://clickhouse:8123/ad_analytics"))
            .name("ClickHouse-Sink");

        // ── Late data handling ──────────────────────────────────────────
        oneMinAggs.getSideOutput(lateOutputTag)
            .addSink(new KafkaSink<>("ad-clicks-late"))
            .name("Late-Data-Sink");

        env.execute("Ad Click Aggregator Pipeline");
    }
}

Deduplication Function

The deduplication operator uses Flink's keyed state with a TTL to track recently seen clicks. It keys by (user_id, ad_id) and drops duplicate clicks within a 60-second window:

public class ClickDeduplicationFunction
    extends KeyedProcessFunction<String, ClickEvent, ClickEvent> {

    // State: timestamp of last seen click for this (user, ad) pair
    private ValueState<Long> lastSeenState;

    @Override
    public void open(Configuration params) {
        ValueStateDescriptor<Long> desc = new ValueStateDescriptor<>(
            "last-seen-ts", Types.LONG);

        // Auto-expire state after 90 seconds (slightly > dedup window)
        StateTtlConfig ttl = StateTtlConfig.newBuilder(Time.seconds(90))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(
                StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupInRocksdbCompactFilter(1000)
            .build();
        desc.enableTimeToLive(ttl);

        lastSeenState = getRuntimeContext().getState(desc);
    }

    @Override
    public void processElement(ClickEvent click, Context ctx,
                               Collector<ClickEvent> out) throws Exception {
        Long lastSeen = lastSeenState.value();
        long now = click.getTimestamp();

        if (lastSeen == null || (now - lastSeen) > 60_000) {
            // New click or outside dedup window → emit
            lastSeenState.update(now);
            out.collect(click);
            ctx.output(metricsTag, new Metric("click.unique", 1));
        } else {
            // Duplicate within 60s → suppress
            ctx.output(metricsTag, new Metric("click.duplicate", 1));
        }
    }
}

Aggregation Function

public class ClickCountAggregator
    implements AggregateFunction<ClickEvent, ClickAccumulator, ClickAccumulator> {

    @Override
    public ClickAccumulator createAccumulator() {
        return new ClickAccumulator();
    }

    @Override
    public ClickAccumulator add(ClickEvent click, ClickAccumulator acc) {
        acc.totalClicks++;
        acc.uniqueUsers.add(click.getUserId());
        acc.countryBreakdown.merge(click.getCountry(), 1L, Long::sum);
        acc.deviceBreakdown.merge(click.getDeviceType(), 1L, Long::sum);
        return acc;
    }

    @Override
    public ClickAccumulator getResult(ClickAccumulator acc) {
        return acc;
    }

    @Override
    public ClickAccumulator merge(ClickAccumulator a, ClickAccumulator b) {
        a.totalClicks += b.totalClicks;
        a.uniqueUsers.addAll(b.uniqueUsers);
        b.countryBreakdown.forEach((k, v) ->
            a.countryBreakdown.merge(k, v, Long::sum));
        b.deviceBreakdown.forEach((k, v) ->
            a.deviceBreakdown.merge(k, v, Long::sum));
        return a;
    }
}

// Window function adds window metadata to the aggregate
public class ClickWindowFunction
    extends ProcessWindowFunction<ClickAccumulator, ClickAggregate, String, TimeWindow> {

    @Override
    public void process(String adId, Context ctx,
                        Iterable<ClickAccumulator> accs,
                        Collector<ClickAggregate> out) {
        ClickAccumulator acc = accs.iterator().next();
        TimeWindow window = ctx.window();

        out.collect(new ClickAggregate(
            adId,
            Instant.ofEpochMilli(window.getStart()),
            Instant.ofEpochMilli(window.getEnd()),
            windowSizeName(window),
            acc.totalClicks,
            acc.uniqueUsers.size(),
            acc.countryBreakdown,
            acc.deviceBreakdown
        ));
    }
}

Click Deduplication Strategies

Deduplication is critical for accurate billing. A user accidentally double-clicking an ad should not charge the advertiser twice. There are multiple approaches, each with distinct trade-offs:

Strategy Comparison

StrategySpaceAccuracyLatencyUse Case
Flink Keyed State~20 bytes/key, RocksDBExact (within TTL)MicrosecondsPrimary dedup in stream
Redis SET with TTL~100 bytes/keyExact~1 ms (network)Cross-service dedup
Bloom Filter~10 bits/element~1% false positivesMicrosecondsPre-filter before exact check
Counting Bloom Filter~40 bits/element~1% FP, supports deleteMicrosecondsTime-window rotation

Redis-Based Deduplication

For cross-service deduplication (e.g., the ad server itself deduplicating before sending to Kafka):

# Redis deduplication with 60-second TTL
# Key format: dedup:{user_id}:{ad_id}
# Value: 1 (existence check only)

def is_duplicate_click(redis_client, user_id, ad_id, ttl_seconds=60):
    """
    Returns True if this click is a duplicate (should be suppressed).
    Uses SET NX (set-if-not-exists) for atomic check-and-set.
    """
    key = f"dedup:{user_id}:{ad_id}"

    # SET key 1 NX EX 60
    # NX = only set if not exists
    # EX = expire after 60 seconds
    was_set = redis_client.set(key, 1, nx=True, ex=ttl_seconds)

    if was_set:
        # Key didn't exist → first click → NOT a duplicate
        return False
    else:
        # Key already existed → duplicate within TTL window
        return True

# Memory estimate for Redis dedup:
# 115K clicks/sec × 60s TTL = ~6.9M active keys
# ~100 bytes per key (key + overhead) = ~690 MB
# With Redis Cluster (3 shards) = ~230 MB per shard

Bloom Filter Pre-Filtering

For extremely high throughput, use a two-layer dedup: Bloom filter as a fast pre-filter, then exact check only for potential duplicates:

public class TwoLayerDeduplication {
    // Rotating Bloom filters — current minute + previous minute
    private BloomFilter<String> currentMinuteFilter;
    private BloomFilter<String> previousMinuteFilter;
    private long currentMinuteTs;

    // Expected elements: 115K clicks/sec × 60s = ~7M per minute
    // Target FPR: 0.1% → ~10 bits/element → ~8.75 MB per filter
    private static final int EXPECTED_INSERTIONS = 7_000_000;
    private static final double FPP = 0.001;

    public TwoLayerDeduplication() {
        this.currentMinuteFilter = BloomFilter.create(
            Funnels.stringFunnel(StandardCharsets.UTF_8),
            EXPECTED_INSERTIONS, FPP);
        this.previousMinuteFilter = BloomFilter.create(
            Funnels.stringFunnel(StandardCharsets.UTF_8),
            EXPECTED_INSERTIONS, FPP);
        this.currentMinuteTs = currentMinute();
    }

    public boolean isDuplicate(String userId, String adId) {
        rotateIfNeeded();
        String key = userId + ":" + adId;

        // Fast path: if NOT in either Bloom filter → definitely unique
        if (!currentMinuteFilter.mightContain(key) &&
            !previousMinuteFilter.mightContain(key)) {
            currentMinuteFilter.put(key);
            return false;
        }

        // Slow path: Bloom filter says "maybe" → check exact store
        // (Redis or Flink state)
        return checkExactStore(key);
    }

    private void rotateIfNeeded() {
        long now = currentMinute();
        if (now > currentMinuteTs) {
            previousMinuteFilter = currentMinuteFilter;
            currentMinuteFilter = BloomFilter.create(
                Funnels.stringFunnel(StandardCharsets.UTF_8),
                EXPECTED_INSERTIONS, FPP);
            currentMinuteTs = now;
        }
    }
}

Aggregation Store — ClickHouse

ClickHouse is an ideal aggregation store for this use case: it's a columnar OLAP database designed for real-time analytical queries over billions of rows, with sub-second latency for aggregation queries.

ClickHouse Schema

-- Raw click events (for batch reprocessing and auditing)
CREATE TABLE ad_clicks_raw ON CLUSTER '{cluster}'
(
    click_id       String,
    ad_id          LowCardinality(String),
    campaign_id    LowCardinality(String),
    advertiser_id  LowCardinality(String),
    user_id        String,
    timestamp      DateTime64(3, 'UTC'),
    ip_address     IPv4,
    country        LowCardinality(FixedString(2)),
    device_type    Enum8('desktop' = 1, 'mobile' = 2, 'tablet' = 3),
    os             LowCardinality(String),
    browser        LowCardinality(String),
    referrer_url   String,
    landing_url    String,
    is_duplicate   UInt8 DEFAULT 0,

    -- Partition and sort keys for efficient scans
    event_date     Date DEFAULT toDate(timestamp)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/ad_clicks_raw',
                             '{replica}')
PARTITION BY toYYYYMMDD(event_date)
ORDER BY (ad_id, timestamp, click_id)
TTL event_date + INTERVAL 90 DAY
SETTINGS index_granularity = 8192;

-- Pre-aggregated 1-minute click counts
CREATE TABLE ad_click_counts_1m ON CLUSTER '{cluster}'
(
    ad_id          LowCardinality(String),
    campaign_id    LowCardinality(String),
    advertiser_id  LowCardinality(String),
    window_start   DateTime('UTC'),
    window_end     DateTime('UTC'),
    click_count    UInt64,
    unique_users   UInt64,
    country        LowCardinality(FixedString(2)),
    device_type    Enum8('desktop' = 1, 'mobile' = 2, 'tablet' = 3),

    event_date     Date DEFAULT toDate(window_start)
)
ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/ad_click_counts_1m',
                                     '{replica}',
                                     (click_count, unique_users))
PARTITION BY toYYYYMM(event_date)
ORDER BY (ad_id, country, device_type, window_start)
TTL event_date + INTERVAL 2 YEAR
SETTINGS index_granularity = 8192;

-- Materialized view: auto-aggregate raw → 1-minute
CREATE MATERIALIZED VIEW ad_click_counts_1m_mv ON CLUSTER '{cluster}'
TO ad_click_counts_1m
AS
SELECT
    ad_id,
    campaign_id,
    advertiser_id,
    toStartOfMinute(timestamp)                         AS window_start,
    toStartOfMinute(timestamp) + INTERVAL 1 MINUTE     AS window_end,
    count()                                             AS click_count,
    uniqExact(user_id)                                  AS unique_users,
    country,
    device_type
FROM ad_clicks_raw
WHERE is_duplicate = 0
GROUP BY ad_id, campaign_id, advertiser_id, window_start,
         window_end, country, device_type;

-- 1-hour rollup from 1-minute aggregates
CREATE TABLE ad_click_counts_1h ON CLUSTER '{cluster}'
(
    ad_id          LowCardinality(String),
    campaign_id    LowCardinality(String),
    advertiser_id  LowCardinality(String),
    window_start   DateTime('UTC'),
    window_end     DateTime('UTC'),
    click_count    UInt64,
    unique_users   UInt64,
    country        LowCardinality(FixedString(2)),
    device_type    Enum8('desktop' = 1, 'mobile' = 2, 'tablet' = 3),
    event_date     Date DEFAULT toDate(window_start)
)
ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/ad_click_counts_1h',
                                     '{replica}',
                                     (click_count, unique_users))
PARTITION BY toYYYYMM(event_date)
ORDER BY (ad_id, country, device_type, window_start)
TTL event_date + INTERVAL 2 YEAR
SETTINGS index_granularity = 8192;

Query Examples

-- Query 1: Clicks for ad X in the last 5 minutes (real-time)
SELECT
    sum(click_count)    AS total_clicks,
    sum(unique_users)   AS total_unique
FROM ad_click_counts_1m
WHERE ad_id = 'ad-00042837'
  AND window_start >= now() - INTERVAL 5 MINUTE;
-- Response time: ~5ms (scans only 5 rows per shard)

-- Query 2: Hourly trend for campaign over 7 days
SELECT
    toStartOfHour(window_start) AS hour,
    sum(click_count)            AS clicks,
    sum(unique_users)           AS uniques
FROM ad_click_counts_1h
WHERE campaign_id = 'camp-001294'
  AND window_start >= now() - INTERVAL 7 DAY
GROUP BY hour
ORDER BY hour;
-- Response time: ~50ms (168 hourly buckets)

-- Query 3: Top ads by country breakdown
SELECT
    ad_id,
    country,
    sum(click_count) AS clicks
FROM ad_click_counts_1h
WHERE advertiser_id = 'adv-000512'
  AND window_start >= now() - INTERVAL 24 HOUR
GROUP BY ad_id, country
ORDER BY clicks DESC
LIMIT 100;
-- Response time: ~100ms

Lambda Architecture

Real-time stream processing is fast but can drift from ground truth due to late-arriving events, reprocessing after failures, or edge cases in deduplication. The Lambda architecture addresses this by running two parallel computation paths:

  1. Speed Layer (Flink) — provides low-latency approximate results within seconds. Handles dedup and windowed aggregation in real-time. May slightly over- or under-count due to late arrivals.
  2. Batch Layer (Spark) — reprocesses raw click data periodically (every 1–6 hours). Produces exact, reconciled counts by processing the complete dataset with full deduplication.
  3. Serving Layer (Query API) — merges results from both layers. For recent data (< 2 hours), serve speed layer results. For older data, serve batch layer results. During overlap, prefer batch results.

Interactive: Lambda Architecture

Watch how real-time and batch layers work together — and how the batch layer corrects speed layer drift over time:

Batch Layer — Spark Aggregation

# Spark batch aggregation job (runs every hour)
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("AdClickBatchAggregator") \
    .config("spark.sql.shuffle.partitions", 256) \
    .config("spark.sql.adaptive.enabled", True) \
    .getOrCreate()

# Read raw click events from data lake (S3/HDFS)
raw_clicks = spark.read.parquet(
    "s3://ad-data-lake/clicks/",
    mergeSchema=True
).filter(
    F.col("event_date") >= F.date_sub(F.current_date(), 2)
)

# Step 1: Exact deduplication using window functions
# Keep only the FIRST click per (user_id, ad_id) within 60-second windows
dedup_window = Window.partitionBy("user_id", "ad_id") \
    .orderBy("timestamp")

deduped = raw_clicks \
    .withColumn("prev_ts",
        F.lag("timestamp").over(dedup_window)) \
    .filter(
        F.col("prev_ts").isNull() |  # first click ever
        (F.unix_timestamp("timestamp") -
         F.unix_timestamp("prev_ts") > 60)  # > 60s gap
    ) \
    .drop("prev_ts")

# Step 2: Aggregate into 1-minute windows
minute_aggs = deduped \
    .withColumn("window_start",
        F.date_trunc("minute", "timestamp")) \
    .groupBy("ad_id", "campaign_id", "advertiser_id",
             "window_start", "country", "device_type") \
    .agg(
        F.count("*").alias("click_count"),
        F.countDistinct("user_id").alias("unique_users")
    ) \
    .withColumn("window_end",
        F.col("window_start") + F.expr("INTERVAL 1 MINUTE")) \
    .withColumn("source", F.lit("batch"))

# Step 3: Write to batch aggregation store
minute_aggs.write \
    .format("jdbc") \
    .option("url", "jdbc:clickhouse://clickhouse:8123/ad_analytics") \
    .option("dbtable", "ad_click_counts_1m_batch") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

print(f"Batch aggregation complete: {minute_aggs.count()} records")

Data Reconciliation

The reconciliation service compares speed layer and batch layer results, flagging and correcting discrepancies:

-- Reconciliation query: compare real-time vs batch counts
-- Run after each batch job completes
SELECT
    rt.ad_id,
    rt.window_start,
    rt.click_count    AS realtime_count,
    bt.click_count    AS batch_count,
    abs(rt.click_count - bt.click_count) AS discrepancy,
    round(abs(rt.click_count - bt.click_count) * 100.0
          / bt.click_count, 2)           AS discrepancy_pct
FROM ad_click_counts_1m AS rt
JOIN ad_click_counts_1m_batch AS bt
  ON rt.ad_id = bt.ad_id
 AND rt.window_start = bt.window_start
 AND rt.country = bt.country
 AND rt.device_type = bt.device_type
WHERE bt.window_start >= now() - INTERVAL 6 HOUR
  AND abs(rt.click_count - bt.click_count) > 0
ORDER BY discrepancy_pct DESC
LIMIT 100;

-- Auto-correct: replace real-time counts with batch counts
-- for windows where batch data is available
INSERT INTO ad_click_counts_1m
SELECT
    bt.ad_id, bt.campaign_id, bt.advertiser_id,
    bt.window_start, bt.window_end,
    bt.click_count, bt.unique_users,
    bt.country, bt.device_type, toDate(bt.window_start)
FROM ad_click_counts_1m_batch AS bt
WHERE bt.window_start >= now() - INTERVAL 6 HOUR;
-- SummingMergeTree will merge the old + new rows on next compaction
-- Use ReplacingMergeTree if you need exact replacement semantics

Exactly-Once Counting

Achieving exactly-once semantics end-to-end requires coordination across three layers:

End-to-End Exactly-Once Pipeline

┌─────────────────────────────────────────────────────────────────────┐
│                  Exactly-Once Guarantee Chain                       │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Producer          Kafka            Flink           ClickHouse      │
│  ─────────         ─────            ─────           ──────────      │
│  idempotent   →   exactly-once  →  checkpointed →  idempotent      │
│  producer          replication     state + offsets   upserts         │
│                                                                     │
│  acks=all          min.isr=2       checkpoint       ReplacingMerge  │
│  retries=3         unclean=false   interval=60s     Tree or         │
│  idempotence       log repl=3      RocksDB state    deduplicate     │
│  =true                              backend         on INSERT       │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Flink checkpointing is the linchpin. During a checkpoint, Flink atomically snapshots:

On failure recovery, Flink restores from the last successful checkpoint: it resets Kafka offsets, restores operator state, and replays from the checkpoint position. Any writes to ClickHouse are idempotent (using ReplacingMergeTree or dedup on INSERT), so replayed outputs don't cause double-counting.

// Flink ClickHouse sink with idempotent writes
public class ClickHouseAggSink extends RichSinkFunction<ClickAggregate>
    implements CheckpointedFunction {

    private ListState<ClickAggregate> bufferedState;
    private List<ClickAggregate> buffer = new ArrayList<>();

    @Override
    public void invoke(ClickAggregate agg, Context ctx) {
        buffer.add(agg);
        if (buffer.size() >= 1000) {
            flushBuffer();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
        // Flush to ClickHouse before checkpoint completes
        flushBuffer();
        bufferedState.clear();
        bufferedState.addAll(buffer);
    }

    @Override
    public void initializeState(FunctionInitializationContext ctx) throws Exception {
        bufferedState = ctx.getOperatorStateStore()
            .getListState(new ListStateDescriptor<>("ch-buffer",
                ClickAggregate.class));

        if (ctx.isRestored()) {
            // Re-flush any buffered records from pre-failure checkpoint
            for (ClickAggregate agg : bufferedState.get()) {
                buffer.add(agg);
            }
            flushBuffer();  // Idempotent — ClickHouse deduplicates
        }
    }

    private void flushBuffer() {
        if (buffer.isEmpty()) return;
        // INSERT with deduplication —
        // ClickHouse ReplicatedMergeTree deduplicates by (ad_id, window_start)
        String sql = "INSERT INTO ad_click_counts_1m " +
            "(ad_id, campaign_id, advertiser_id, window_start, " +
            " window_end, click_count, unique_users, country, device_type) " +
            "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";

        try (Connection conn = dataSource.getConnection();
             PreparedStatement ps = conn.prepareStatement(sql)) {
            for (ClickAggregate agg : buffer) {
                ps.setString(1, agg.adId);
                ps.setString(2, agg.campaignId);
                ps.setString(3, agg.advertiserId);
                ps.setTimestamp(4, Timestamp.from(agg.windowStart));
                ps.setTimestamp(5, Timestamp.from(agg.windowEnd));
                ps.setLong(6, agg.clickCount);
                ps.setLong(7, agg.uniqueUsers);
                ps.setString(8, agg.country);
                ps.setString(9, agg.deviceType);
                ps.addBatch();
            }
            ps.executeBatch();
            buffer.clear();
        }
    }
}

Query Service

The query service is a thin API layer that routes queries to the appropriate store based on the time range and freshness requirements:

@RestController
@RequestMapping("/api/v1/clicks")
public class ClickQueryController {

    private final ClickHouseTemplate clickHouse;
    private final CacheManager cache;

    // GET /api/v1/clicks/ad/{adId}?window=5m
    @GetMapping("/ad/{adId}")
    public ClickResponse getAdClicks(
            @PathVariable String adId,
            @RequestParam(defaultValue = "5m") String window,
            @RequestParam(required = false) String country,
            @RequestParam(required = false) String device) {

        Duration dur = parseDuration(window);
        String cacheKey = String.format("clicks:%s:%s:%s:%s",
            adId, window, country, device);

        // Cache short windows briefly (10s), longer windows more (60s)
        Duration cacheTtl = dur.toMinutes() <= 5
            ? Duration.ofSeconds(10)
            : Duration.ofSeconds(60);

        return cache.get(cacheKey, cacheTtl, () -> {
            // Choose table based on window size
            String table = chooseTable(dur);

            // Build and execute ClickHouse query
            return clickHouse.query(
                "SELECT sum(click_count) AS clicks, " +
                "       sum(unique_users) AS uniques " +
                "FROM " + table + " " +
                "WHERE ad_id = ? " +
                "  AND window_start >= now() - toIntervalSecond(?) " +
                buildFilters(country, device),
                adId, dur.getSeconds()
            );
        });
    }

    // Route to the most efficient table
    private String chooseTable(Duration window) {
        if (window.toHours() >= 24) return "ad_click_counts_1h";
        if (window.toMinutes() >= 10) return "ad_click_counts_1h";
        return "ad_click_counts_1m";
    }
}

Fraud Detection

Click fraud costs advertisers an estimated $100 billion annually. The aggregator must detect and filter fraudulent clicks before they affect billing. Fraud signals are evaluated at multiple levels:

Fraud Detection Signals

SignalDetection MethodAction
IP click flooding> 50 clicks from same IP in 1 minuteBlock IP, flag clicks
Click farm patternsHigh click volume from narrow IP range + low conversionFlag campaign, alert
Bot signaturesMissing/invalid browser fingerprint, datacenter IPsDrop click
Geographic anomaliesClicks from non-targeted regionsExclude from billing
Timing patternsPerfectly periodic clicks (e.g., exactly every 5 seconds)Flag as bot
CTR anomaliesClick-through rate > 3σ above campaign averageHold for review
public class ClickFraudDetector
    extends KeyedProcessFunction<String, ClickEvent, FraudAlert> {

    // State: sliding window of click timestamps per IP
    private MapState<String, List<Long>> ipClickWindows;
    // State: per-ad click velocity
    private ValueState<ClickVelocity> adVelocity;

    @Override
    public void open(Configuration params) {
        MapStateDescriptor<String, List<Long>> ipDesc =
            new MapStateDescriptor<>("ip-clicks", String.class,
                TypeInformation.of(new TypeHint<List<Long>>(){}));
        StateTtlConfig ipTtl = StateTtlConfig
            .newBuilder(Time.minutes(5))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .build();
        ipDesc.enableTimeToLive(ipTtl);
        ipClickWindows = getRuntimeContext().getMapState(ipDesc);

        ValueStateDescriptor<ClickVelocity> velDesc =
            new ValueStateDescriptor<>("ad-velocity", ClickVelocity.class);
        adVelocity = getRuntimeContext().getState(velDesc);
    }

    @Override
    public void processElement(ClickEvent click, Context ctx,
                               Collector<FraudAlert> out) throws Exception {
        String ip = click.getIpAddress();
        long now = click.getTimestamp();

        // ── Check 1: IP flooding ────────────────────────────────
        List<Long> ipClicks = ipClickWindows.get(ip);
        if (ipClicks == null) ipClicks = new ArrayList<>();

        // Remove clicks older than 60 seconds
        ipClicks.removeIf(ts -> (now - ts) > 60_000);
        ipClicks.add(now);
        ipClickWindows.put(ip, ipClicks);

        if (ipClicks.size() > 50) {
            out.collect(new FraudAlert(
                FraudType.IP_FLOODING, click,
                "IP " + ip + " made " + ipClicks.size() +
                " clicks in 60s"));
            return; // Don't count this click
        }

        // ── Check 2: Timing pattern (periodic clicks) ──────────
        if (ipClicks.size() >= 5) {
            List<Long> intervals = new ArrayList<>();
            for (int i = 1; i < ipClicks.size(); i++) {
                intervals.add(ipClicks.get(i) - ipClicks.get(i - 1));
            }
            double stdDev = calculateStdDev(intervals);
            if (stdDev < 100) { // Less than 100ms variation = bot
                out.collect(new FraudAlert(
                    FraudType.BOT_PATTERN, click,
                    "Periodic click pattern (σ=" + stdDev + "ms)"));
                return;
            }
        }

        // ── Check 3: Click velocity anomaly ─────────────────────
        ClickVelocity vel = adVelocity.value();
        if (vel == null) vel = new ClickVelocity();
        vel.addClick(now);
        adVelocity.update(vel);

        if (vel.getCurrentRate() > vel.getMovingAverage() * 3) {
            out.collect(new FraudAlert(
                FraudType.VELOCITY_ANOMALY, click,
                "Click rate " + vel.getCurrentRate() +
                "/min vs avg " + vel.getMovingAverage() + "/min"));
        }
    }
}

IP-Based Filtering

# IP reputation database: maintain a set of known-bad IPs
# Updated from threat intelligence feeds + own detection

class IPReputationFilter:
    def __init__(self, redis_client):
        self.redis = redis_client
        # Bloom filter for datacenter IP ranges (fast pre-filter)
        self.datacenter_ips = load_datacenter_ip_bloom_filter()

    def is_suspicious(self, ip_address: str) -> tuple[bool, str]:
        # Check 1: Known bad IP (blacklisted)
        if self.redis.sismember("ip:blacklist", ip_address):
            return True, "blacklisted"

        # Check 2: Datacenter / VPN / Proxy IP
        if self.datacenter_ips.might_contain(ip_address):
            return True, "datacenter_ip"

        # Check 3: Rate limit per IP
        key = f"ip:rate:{ip_address}"
        count = self.redis.incr(key)
        if count == 1:
            self.redis.expire(key, 60)  # 60-second window
        if count > 100:
            # Auto-blacklist for 1 hour
            self.redis.sadd("ip:blacklist", ip_address)
            self.redis.expire(f"ip:blacklist:{ip_address}", 3600)
            return True, "rate_limited"

        return False, "clean"

Scaling & Operational Concerns

Kafka Scaling

┌────────────────────────────────────────────────────────────────────┐
│  Kafka Cluster Sizing                                              │
├────────────────────────────────────────────────────────────────────┤
│  Brokers:      15 (5 racks × 3 brokers)                           │
│  Partitions:   256 (ad-clicks topic)                               │
│  Replication:  3 (cross-rack)                                      │
│  Throughput:   ~58 MB/sec ingress, ~174 MB/sec with replication    │
│  Disk:         7 days retention × 5 TB/day = 35 TB → 12 TB/broker │
│  Network:      Each broker: ~12 MB/sec in, ~35 MB/sec out          │
│                (3× for replication + consumer fan-out)              │
└────────────────────────────────────────────────────────────────────┘
┌────────────────────────────────────────────────────────────────────┐
│  Flink Cluster Sizing                                              │
├────────────────────────────────────────────────────────────────────┤
│  TaskManagers:     32 (8 cores, 32 GB each)                        │
│  Parallelism:      256 (matches Kafka partitions)                  │
│  Checkpoint:       60s interval, S3 backend                        │
│  State size:       ~50 GB (dedup state + window accumulators)      │
│  RocksDB:          512 MB write buffer per TM, bloom filter on     │
│  Watermark delay:  10 seconds (bounded out-of-order)               │
│  Recovery time:    ~30 seconds from checkpoint                     │
└────────────────────────────────────────────────────────────────────┘

ClickHouse Cluster

┌────────────────────────────────────────────────────────────────────┐
│  ClickHouse Cluster Sizing                                         │
├────────────────────────────────────────────────────────────────────┤
│  Shards:     8 (distributed across AZs)                            │
│  Replicas:   2 per shard (16 nodes total)                          │
│  CPU:        32 cores per node                                     │
│  RAM:        256 GB per node (hot data in memory)                  │
│  Disk:       8 TB NVMe SSD per node                                │
│  Data:       Raw clicks ~90 days (compressed ~2:1 → 225 TB total)  │
│              Aggregates ~2 years (very compact)                     │
│  Queries:    Sub-second for recent aggregates                      │
│              < 5s for full table scans over 24h                    │
└────────────────────────────────────────────────────────────────────┘

Aggregation Store Alternatives

StoreTypeStrengthsWeaknesses
ClickHouseColumnar OLAPFastest aggregation queries, great compression, SQL interfaceNo true upsert, eventual consistency on merges
Apache DruidReal-time OLAPNative Kafka ingestion, sub-second for recent data, built-in rollupComplex operations, high memory usage
InfluxDBTime-Series DBPurpose-built for time-series, built-in downsampling, Flux query languageLimited JOIN support, less flexible dimensions
Apache PinotReal-time OLAPUpsert support, low-latency, LinkedIn-scale provenSmaller community, steeper learning curve
Redis + PostgreSQLCache + RDBMSSimple, familiar, good for moderate scaleWon't scale to 10B clicks/day without significant engineering

Monitoring & Alerting

# Key metrics to monitor
┌──────────────────────────────────────────────────────────────────┐
│  Metric                          │  Alert Threshold              │
├──────────────────────────────────┼───────────────────────────────┤
│  Kafka consumer lag              │  > 100K messages              │
│  Flink checkpoint duration       │  > 120 seconds                │
│  Flink checkpoint failures       │  > 3 consecutive              │
│  End-to-end latency (click→agg)  │  > 10 seconds (p99)          │
│  Dedup ratio                     │  < 1% or > 20% (anomaly)     │
│  ClickHouse query latency (p99)  │  > 500 ms                    │
│  Fraud alert rate                │  > 5% of total clicks         │
│  Batch-vs-RT discrepancy         │  > 2% per window              │
│  Data lake write failures        │  Any                          │
│  Kafka broker disk usage         │  > 80%                        │
└──────────────────────────────────┴───────────────────────────────┘

Summary

ComponentTechnologyRole
IngestionApache Kafka (256 partitions)Durable, ordered event stream
Speed LayerApache FlinkReal-time dedup + windowed aggregation
Batch LayerApache SparkHourly exact reprocessing
Aggregation StoreClickHouse (8 shards × 2 replicas)Sub-second OLAP queries
DeduplicationFlink state + Redis + Bloom filterMulti-layer click dedup
Fraud DetectionFlink CEP + IP reputation DBReal-time fraud filtering
Data LakeS3 / HDFS (Parquet format)Raw event archive for reprocessing
Query ServiceSpring Boot REST APIUnified query interface with caching
Key design decisions: (1) Kafka partitioned by ad_id enables local aggregation without shuffles. (2) Lambda architecture provides both low-latency results and exact correctness via batch reconciliation. (3) Three-layer deduplication (Bloom filter → Flink state → batch dedup) balances throughput and accuracy. (4) ClickHouse's columnar storage and SummingMergeTree engine are purpose-built for exactly this workload.