← All Posts
High Level Design Series· Real-World Designs · Post 46 of 70

Design: Distributed Key-Value Store

Introduction

A distributed key-value (KV) store is one of the most fundamental and frequently asked system design interview topics. It underlies nearly every large-scale system: session stores, caching layers, configuration management, metadata services, and even primary databases. Systems like Amazon DynamoDB, Apache Cassandra, Riak, and Voldemort are all implementations of distributed KV stores.

This post is deeply rooted in the Amazon Dynamo paper (2007), which introduced many of the techniques we'll cover. If you can explain the design of a Dynamo-style KV store end-to-end, you can ace virtually any system design interview question on distributed storage.

What is a Key-Value Store?

A key-value store is a non-relational database where data is stored as a collection of key-value pairs. The key is a unique identifier, and the value is the data associated with that key. Think of it as a giant hash map, but distributed across multiple machines.

put(key, value)   →  Store the value associated with the key
get(key)          →  Retrieve the value associated with the key
delete(key)       →  Remove the key-value pair

Values can be anything: a string, a JSON blob, a serialized object, or even a binary blob. The store treats values as opaque — it doesn't inspect or index the contents.

Requirements

Functional Requirements

Non-Functional Requirements

RequirementTargetWhy It Matters
High Availability99.99% uptimeShopping cart, session store — must always be writable
ScalabilityHorizontal, 10s–1000s of nodesData grows unboundedly; add nodes without downtime
Partition ToleranceMust survive network partitionsCAP theorem — partitions are inevitable in distributed systems
Tunable ConsistencyEventual → strong (configurable)Different use cases need different consistency levels
Low Latencyp99 < 10ms for reads and writesReal-time applications demand sub-10ms response times
DurabilityNo data loss after acknowledged writesData must survive disk failures and node crashes
CAP trade-off: By the CAP theorem, during a network partition we must choose between Consistency and Availability. Dynamo-style stores choose AP (Availability + Partition tolerance) and provide eventual consistency. This is the key design philosophy that drives every decision in this system.

Scale Assumptions

Single-Server Key-Value Store

Before distributing anything, let's understand how a single-server KV store works. This is the building block.

In-Memory Hash Table

The simplest approach: keep everything in a hash table in memory.

class SingleServerKVStore:
    def __init__(self):
        self.store = {}   # hash map: key → value

    def put(self, key: str, value: bytes):
        self.store[key] = value

    def get(self, key: str) -> bytes:
        return self.store.get(key, None)

    def delete(self, key: str):
        self.store.pop(key, None)

Pros: O(1) reads and writes, dead simple. Cons: Data lost on crash, limited by single machine's RAM.

Durability: Write-Ahead Log (WAL)

To survive crashes, we add a Write-Ahead Log (WAL) — an append-only file on disk. Before modifying the in-memory hash table, we write the operation to the WAL.

class DurableKVStore:
    def __init__(self, wal_path: str):
        self.store = {}
        self.wal = open(wal_path, 'a+b')
        self._recover()     # replay WAL on startup

    def put(self, key: str, value: bytes):
        # 1. Write to WAL first (fsync for durability)
        entry = f"PUT {key} {value}\n"
        self.wal.write(entry.encode())
        self.wal.flush()
        os.fsync(self.wal.fileno())
        # 2. Then update in-memory hash table
        self.store[key] = value

    def _recover(self):
        """Replay WAL to rebuild in-memory state after crash"""
        self.wal.seek(0)
        for line in self.wal:
            parts = line.decode().strip().split(' ', 2)
            if parts[0] == 'PUT':
                self.store[parts[1]] = parts[2]
            elif parts[0] == 'DEL':
                self.store.pop(parts[1], None)

Handling Memory Limits: LSM Trees + SSTables

When data exceeds RAM, we use an LSM Tree (Log-Structured Merge Tree):

  1. MemTable — in-memory sorted tree (red-black tree or skip list). Writes go here first.
  2. When the MemTable exceeds a threshold (e.g., 64 MB), it's flushed to disk as an SSTable (Sorted String Table) — an immutable, sorted file.
  3. Reads check the MemTable first, then SSTables from newest to oldest.
  4. Compaction periodically merges SSTables to reclaim space and reduce read amplification.
Write Path:   Client → WAL (append) → MemTable (sorted insert)
                                          ↓ (when full)
                                      Flush to SSTable on disk

Read Path:    Client → MemTable → SSTable₁ → SSTable₂ → ... → SSTableₙ
              (newest first, stop on first match)

Compaction:   SSTable₁ + SSTable₂ → Merged SSTable (remove duplicates/tombstones)
Bloom filters are critical here. Before checking each SSTable on disk, a Bloom filter tells us (probabilistically) whether the key might exist in that SSTable. This avoids unnecessary disk reads — reducing read latency from O(N SSTables) to O(1) in the common case.

A single-server store can handle ~100K–1M ops/sec depending on hardware. But for our requirements (millions of ops/sec, petabytes of data, high availability), we need to distribute it across many machines.

Data Partitioning: Consistent Hashing

The fundamental question in a distributed KV store: given a key, which server stores it? We need a partitioning strategy that:

Why Not Simple Hashing?

The naive approach server = hash(key) % N is catastrophic for distributed systems:

# With 4 servers: hash(key) % 4
# Key "user:123" → hash = 9 → server 1
# Key "user:456" → hash = 7 → server 3

# Now add server 5: hash(key) % 5
# Key "user:123" → hash = 9 → server 4  ← MOVED!
# Key "user:456" → hash = 7 → server 2  ← MOVED!

# Result: ~80% of keys need to be remapped (N-1/N)
# This causes a massive cache miss storm ("thundering herd")

Consistent Hashing

Consistent hashing maps both keys and servers onto a circular hash space (the "hash ring"). A key is stored on the first server encountered clockwise from the key's position on the ring.

Hash space: [0, 2³² - 1] arranged as a circle (ring) Node position: hash(node_IP) → position on ring Key position: hash(key) → position on ring Assignment: key → walk clockwise → first node encountered = owner

When a node joins or leaves, only the keys between the new/departed node and its predecessor need to be remapped. This is O(K/N) keys (where K = total keys, N = total nodes), compared to O(K) for naive hashing.

Virtual Nodes (vnodes)

With physical nodes alone, data distribution is uneven — some nodes get more keys than others, especially with few nodes. Virtual nodes solve this by mapping each physical node to multiple positions on the ring.

# Without vnodes: 3 physical nodes, uneven distribution
Ring: ────N1──────────────────N2────N3────────────
          ↑ owns ~60% of ring      ↑ owns ~10%

# With vnodes: each physical node gets ~150 virtual nodes
Ring: ─N1a─N2c─N3a─N1b─N3b─N2a─N1c─N2b─N3c─N1d─...
       Much more uniform distribution!
AspectWithout vnodesWith vnodes (~150 per node)
Data distributionHighly unevenNear-uniform (std dev < 5%)
Node addition/removalOne node gets all the loadLoad spreads across many nodes
Heterogeneous hardwareCan't account for itMore vnodes for more powerful nodes
Metadata overheadMinimalModerate (ring table is larger)
DynamoDB uses vnodes extensively. The number of virtual nodes per physical node is proportional to the node's capacity. A beefy machine with 256 GB RAM might get 256 vnodes, while a smaller one gets 64. This ensures data distribution matches resource capacity.

▶ Consistent Hash Ring with Replication

Watch keys map to nodes on the ring, get replicated clockwise, and see how write/read paths work.

Data Replication

To achieve high availability and durability, each key is replicated to N nodes (typically N=3). The key's coordinator (the node it maps to on the ring) replicates the data to the next N-1 nodes clockwise on the ring.

Replication Factor N = 3

Hash Ring:  ... → N1 → N2 → N3 → N4 → N5 → ...

Key "user:42" maps to N1 (coordinator)
  → Replica 1: N1 (coordinator)
  → Replica 2: N2 (next clockwise)
  → Replica 3: N3 (next clockwise after N2)

These 3 nodes form the "preference list" for this key.

Preference List

The preference list is the ordered list of nodes responsible for storing a particular key. With virtual nodes, we skip vnodes belonging to the same physical node:

def get_preference_list(key, ring, N=3):
    """Return N distinct physical nodes for this key"""
    hash_val = hash(key)
    start = find_first_node_clockwise(ring, hash_val)
    
    preference_list = []
    seen_physical = set()
    
    node = start
    while len(preference_list) < N:
        physical = node.physical_node
        if physical not in seen_physical:
            preference_list.append(node)
            seen_physical.add(physical)
        node = next_clockwise(ring, node)
    
    return preference_list
Critical detail for interviews: When using virtual nodes, the preference list must skip vnodes belonging to the same physical node. Otherwise, you might "replicate" to 3 vnodes all on the same physical machine — defeating the entire purpose of replication!

Replication Strategy

Replicas can be placed using different strategies:

Quorum Consensus

With data replicated across N nodes, how do we ensure consistency? The answer is quorum-based reads and writes.

The Parameters: N, W, R

ParameterMeaningTypical Value
NTotal number of replicas3
WWrite quorum — number of replicas that must acknowledge a write2
RRead quorum — number of replicas that must respond to a read2

The Quorum Condition

W + R > N → Strong consistency guaranteed Why? If W nodes have the write and R nodes are read, and W + R > N, then at least one node must be in BOTH sets → that node has the latest value. Example: N=3, W=2, R=2 → 2+2=4 > 3 ✓ Write acknowledged by {N1, N2} Read queries {N2, N3} Overlap: N2 has the latest value → client sees it

Tunable Consistency

The beauty of quorum-based systems is that consistency is tunable per-request:

ConfigurationWRW+R>N?Trade-off
Strong consistency224 > 3 ✓Every read returns the latest write. Higher latency.
Write-heavy134 > 3 ✓Fast writes (only 1 ACK needed), slow reads (must read all 3). Good for write-heavy workloads.
Read-heavy314 > 3 ✓Slow writes (all 3 must ACK), fast reads (any 1 node). Good for read-heavy workloads.
Eventual consistency112 > 3 ✗Fastest reads and writes. Stale reads possible. Fine for caching/session data.
Strong write durability336 > 3 ✓Maximum safety — all replicas must agree on both reads and writes. Slowest but safest.
DynamoDB's approach: DynamoDB defaults to eventually consistent reads (R=1) for performance but offers ConsistentRead=true which bumps R to a majority quorum. Writes always require W=2 for durability. This gives developers a per-request knob to trade consistency for latency.

▶ Quorum Read/Write

See how W=2 writes and R=2 reads overlap to guarantee the latest value is always seen (N=3).

Conflict Resolution: Vector Clocks

In an eventually consistent system, concurrent writes to the same key can happen on different replicas. We need a mechanism to detect and resolve these conflicts.

Why Timestamps Aren't Enough

Last-Write-Wins (LWW) using wall-clock timestamps seems simple but is fundamentally flawed:

Vector Clocks

A vector clock is a list of (node, counter) pairs that tracks the causal history of a value. Each node increments its own counter when it writes.

Vector Clock: D([S1, v1], [S2, v2], ..., [Sn, vn])

Rules:
  - When node Si writes, increment vi
  - When node Si receives a write from Sj, merge: take max of each counter

Comparison:
  - VC₁ < VC₂ (happened-before):  every counter in VC₁ ≤ VC₂, at least one strictly less
  - VC₁ ∥ VC₂ (concurrent):       neither VC₁ < VC₂ nor VC₂ < VC₁

Example: Shopping Cart Conflict

Step 1: Client writes via node S1
        Value: {eggs}
        Clock: D([S1, 1])

Step 2: Client reads D([S1, 1]), writes via S1
        Value: {eggs, milk}
        Clock: D([S1, 2])

Step 3: Two concurrent updates (network partition between clients):

  Client A reads D([S1, 2]), writes via S2:
        Value: {eggs, milk, cheese}
        Clock: D([S1, 2], [S2, 1])

  Client B reads D([S1, 2]), writes via S3:
        Value: {eggs, butter}
        Clock: D([S1, 2], [S3, 1])

Step 4: Client reads both versions:
        D([S1, 2], [S2, 1])  →  {eggs, milk, cheese}
        D([S1, 2], [S3, 1])  →  {eggs, butter}

  These are CONCURRENT (neither dominates the other).
  Conflict! Client must resolve:
  → Merge: {eggs, milk, cheese, butter}
  → Write via S1: D([S1, 3], [S2, 1], [S3, 1])

Conflict Resolution Strategies

StrategyHow It WorksUsed By
Client-side resolutionClient receives all conflicting versions (siblings) and merges them. Most flexible but pushes complexity to the client.Riak, DynamoDB (optional)
Last-Write-Wins (LWW)Use timestamps; last write wins. Simple but loses data.Cassandra (default), DynamoDB (default)
Application-specific mergeCustom merge function (e.g., union for sets, max for counters). Requires domain knowledge.Riak CRDTs
CRDTsConflict-free Replicated Data Types: data structures that can be merged automatically without conflicts (G-Counter, OR-Set, etc.).Riak, Redis (CRDT module)
Vector clock growth problem: Vector clocks grow as more nodes coordinate writes to a key. To prevent unbounded growth, DynamoDB uses a clock truncation scheme: when the vector clock exceeds a threshold (e.g., 10 entries), the oldest entry (by timestamp) is removed. This can lose causality information in rare cases but keeps metadata bounded.

Write and Read Paths

Write Path

Client → Any Node (Coordinator) → Replicate to N nodes

Detailed steps:
  1. Client sends put(key, value) to ANY node in the cluster.
     (All nodes are equal — no master!)

  2. Receiving node becomes the "coordinator" for this request.
     It uses consistent hashing to identify the N replicas.

  3. Coordinator sends the write to ALL N replicas in parallel.

  4. Coordinator waits for W acknowledgments.
     (W=2 for N=3 typically)

  5. Once W ACKs received → respond SUCCESS to client.
     The remaining N-W replicas will receive the write eventually.

  6. Each replica's local write path:
     a. Write to commit log / WAL (append-only, for durability)
     b. Write to MemTable (in-memory sorted structure)
     c. When MemTable is full → flush to SSTable on disk
     d. Acknowledge the coordinator

Read Path

Client → Any Node (Coordinator) → Read from R replicas → Resolve → Return

Detailed steps:
  1. Client sends get(key) to ANY node.

  2. Coordinator identifies the N replicas using consistent hashing.

  3. Coordinator sends read requests to ALL N replicas in parallel.

  4. Coordinator waits for R responses.

  5. If all R responses have the same value:
     → Return the value to the client.

  6. If responses differ (stale replicas):
     → Use vector clocks to determine the latest version.
     → If concurrent versions exist → return all siblings to client.
     → Trigger "read repair": send the latest value to stale replicas.

  7. Each replica's local read path:
     a. Check MemTable first (in-memory)
     b. If not found → check Bloom filter for each SSTable
     c. If Bloom filter says "maybe" → check SSTable on disk
     d. Return the value (or not-found)

Read Repair

Read repair is an opportunistic consistency mechanism. During a read, if the coordinator detects that some replicas have stale data, it sends the latest version to those replicas. This gradually repairs inconsistencies without any background process.

Read Repair Example:
  Key "user:42" has value v5 on N1, N2 but v3 (stale) on N3.

  1. Coordinator reads from N1 (v5), N2 (v5), N3 (v3)
  2. Coordinator sees N3 is stale
  3. Returns v5 to client
  4. Asynchronously sends v5 to N3 → N3 updates to v5

  Result: Next read from N3 will return the correct value.

Handling Temporary Failures

Nodes go down temporarily all the time: network blips, garbage collection pauses, rolling deployments. The system must remain available during these transient failures.

Sloppy Quorum

In a strict quorum, writes and reads must involve the exact nodes in the preference list. If one of those nodes is down, the operation fails. A sloppy quorum relaxes this: if a node in the preference list is unreachable, the request is sent to the next healthy node on the ring (not in the original preference list).

Strict Quorum (preference list: [N1, N2, N3]):
  N2 is down → write to N1, N3 → only 2 nodes → W=2 satisfied ✓

  But what if N2 AND N3 are down?
  Strict: write fails! Only N1 is available.
  Sloppy: write to N1, N4, N5 (next healthy nodes on ring) → W=2 satisfied ✓

Hinted Handoff

When a sloppy quorum sends data to a stand-in node (e.g., N4 holds data meant for N2), that node stores a hint — metadata indicating the intended recipient. When N2 comes back online, N4 forwards the data to N2 and deletes its local copy.

Hinted Handoff Flow:
  1. N2 goes down.
  2. Write for key K (preference list: [N1, N2, N3]):
     → Coordinator sends to N1, N4 (stand-in for N2), N3
     → N4 stores: { key: K, value: V, hint: "intended for N2" }

  3. N2 comes back online.
  4. N4 detects N2 is alive (via gossip protocol).
  5. N4 sends the hinted data to N2.
  6. N2 stores the data normally.
  7. N4 deletes the hint.

Result: No data loss, no unavailability, eventual consistency maintained.
Sloppy quorum + hinted handoff is what allows Dynamo-style stores to be "always writable." Even during major outages, writes succeed by using stand-in nodes. This is the core mechanism behind Amazon's shopping cart being available even when entire data centers go down.

Handling Permanent Failures: Merkle Trees

Hinted handoff handles temporary failures. But what about permanent failures — a disk dies, a node is decommissioned, or data gets corrupted? We need a way to detect and repair long-term data divergence between replicas.

The Problem

Comparing all keys between two replicas is expensive. If each replica has billions of keys, sending all keys over the network is impractical. We need a way to efficiently identify exactly which keys differ.

Anti-Entropy with Merkle Trees

A Merkle tree (hash tree) is a binary tree where each leaf contains the hash of a data block, and each internal node contains the hash of its children. This allows efficient comparison:

Merkle Tree Structure:
                    Root Hash
                   /          \
            Hash(L+R)        Hash(L+R)
            /     \          /     \
       Hash(K1)  Hash(K2)  Hash(K3)  Hash(K4)
         |         |         |         |
        K1:V1    K2:V2     K3:V3    K4:V4

Key range is divided into buckets.
Each leaf = hash of all key-value pairs in that bucket.
Each parent = hash(left_child || right_child).

Comparison Algorithm

  1. Compare roots: If root hashes match → replicas are identical. Done! (O(1))
  2. Roots differ: Compare left and right children.
  3. Recursively descend into subtrees with different hashes.
  4. At the leaves: We've identified the exact key ranges that differ.
  5. Sync only the divergent ranges.
Complexity: - If replicas are identical: O(1) — just compare root hashes - If k keys differ out of N total: O(k · log(N)) — descend to divergent leaves - Data transferred: only the differing key-value pairs vs. Naive comparison: O(N) network + O(N) comparison Merkle trees turn an O(N) problem into O(log N) in the common case.

Merkle Tree Anti-Entropy Process

Anti-Entropy Repair:
  1. Each node maintains a Merkle tree per key range it owns.
  2. Periodically (e.g., every 10 minutes), a node picks a random
     replica and initiates a Merkle tree comparison.

  3. Exchange root hashes:
     Node A root: 0xABCD
     Node B root: 0xABCF  ← different!

  4. Exchange children hashes:
     A.left = 0x1234,  B.left = 0x1234  ← same (skip)
     A.right = 0x5678, B.right = 0x5679 ← different (descend)

  5. Continue descending until divergent leaves are found.

  6. Node with stale data requests the correct key-value pairs.

  7. Trees are rebuilt when data changes (incrementally or periodically).
Cassandra's implementation: Cassandra maintains one Merkle tree per column family (table) per node. Anti-entropy repair runs via nodetool repair. During repair, it builds Merkle trees by scanning all SSTables, compares them between replicas, and streams only the differences. This is critical after replacing a dead node or recovering from long outages.

▶ Merkle Tree Anti-Entropy

Two replicas compare Merkle tree hashes level by level, narrowing down to the exact divergent data range.

Failure Detection: Gossip Protocol

In a decentralized system with no master, how does a node know which other nodes are alive or dead? The answer is the gossip protocol, an epidemic-style communication mechanism.

How Gossip Works

  1. Each node maintains a membership list: a table of all known nodes and their heartbeat counters.
  2. Periodically (e.g., every 1 second), each node increments its own heartbeat counter.
  3. Each node randomly selects another node and sends its entire membership list.
  4. The receiving node merges the lists: for each node, keep the higher heartbeat counter.
  5. If a node's heartbeat hasn't been updated for a threshold period (e.g., 30 seconds), it's marked as suspected failed.
  6. After another timeout (e.g., 60 seconds without recovery), it's marked as permanently failed and removed.
Gossip Protocol Example:

Time 0: Node A membership list:
  A: heartbeat=100, timestamp=T0
  B: heartbeat=95,  timestamp=T0
  C: heartbeat=80,  timestamp=T0-5s
  D: heartbeat=70,  timestamp=T0-40s  ← suspected failed!

Time 1: A randomly picks B and sends its list.
  B merges: keeps higher heartbeat for each node.
  B now knows D might be failed too.

Time 2: B gossips to C with merged list.
  C now also knows about D's potential failure.

→ Within O(log N) gossip rounds, ALL nodes know D is suspected failed.
   (This is the mathematical beauty of epidemic protocols.)

Failure Detection States

State Machine:
  ALIVE  →  SUSPECTED  →  FAILED  →  REMOVED
           (no heartbeat    (timeout     (cleanup)
            for T1 sec)     for T2 sec)

Advantages of Gossip

Gossip carries more than heartbeats. In Cassandra and DynamoDB, gossip also propagates: cluster topology changes (new nodes joining/leaving), schema changes, token/partition ownership, and load information for query routing. It's the nervous system of the cluster.

Coordinator and Request Routing

In a Dynamo-style system, any node can serve any request. There's no master/slave distinction. This is fundamental to the availability guarantee.

Request Flow

Two routing strategies:

1. Client-side routing (used by DynamoDB):
   Client knows the ring topology (via partition map).
   Client sends request directly to the coordinator.
   → Lowest latency (1 hop to coordinator → N-1 hops to replicas)

2. Generic load balancer (simpler):
   Client → Load Balancer → Random Node → Forward to Coordinator
   → Extra hop but client doesn't need ring knowledge

Coordinator responsibilities:
  - Identify the N replicas for the key
  - Send read/write to all N replicas in parallel
  - Wait for W/R responses
  - Handle conflict resolution (for reads)
  - Trigger read repair (for stale replicas)
  - Return result to client

Request Routing with Partition Map

# Partition map: every node knows which ranges map to which nodes
partition_map = {
    (0, 1000):     ['N1', 'N2', 'N3'],
    (1001, 2000):  ['N2', 'N3', 'N4'],
    (2001, 3000):  ['N3', 'N4', 'N5'],
    ...
}

def route_request(key):
    token = hash(key) % RING_SIZE
    for (start, end), nodes in partition_map.items():
        if start <= token <= end:
            coordinator = nodes[0]  # first in preference list
            return coordinator

Complete Architecture

Let's bring all the components together:

┌────────────────────────────────────────────────────────────────────┐
│                        CLIENT LAYER                                │
│  Client → (Partition Map) → Choose Coordinator Node                │
└──────────────────────────────┬─────────────────────────────────────┘
                               │
┌──────────────────────────────▼─────────────────────────────────────┐
│                      COORDINATOR NODE                              │
│  ┌───────────────┐  ┌──────────────┐  ┌──────────────────────┐    │
│  │ Request Router │  │ Quorum Logic │  │ Conflict Resolution  │    │
│  │ (hash ring    │  │ (W, R params)│  │ (vector clocks,      │    │
│  │  + pref list) │  │              │  │  read repair)        │    │
│  └───────────────┘  └──────────────┘  └──────────────────────┘    │
└──────────────────────────────┬─────────────────────────────────────┘
                               │ Replicate to N nodes
        ┌──────────────────────┼──────────────────────┐
        ▼                      ▼                      ▼
┌──────────────┐    ┌──────────────┐    ┌──────────────┐
│   REPLICA 1  │    │   REPLICA 2  │    │   REPLICA 3  │
│  ┌────────┐  │    │  ┌────────┐  │    │  ┌────────┐  │
│  │  WAL   │  │    │  │  WAL   │  │    │  │  WAL   │  │
│  └────────┘  │    │  └────────┘  │    │  └────────┘  │
│  ┌────────┐  │    │  ┌────────┐  │    │  ┌────────┐  │
│  │MemTable│  │    │  │MemTable│  │    │  │MemTable│  │
│  └────────┘  │    │  └────────┘  │    │  └────────┘  │
│  ┌────────┐  │    │  ┌────────┐  │    │  ┌────────┐  │
│  │SSTables│  │    │  │SSTables│  │    │  │SSTables│  │
│  └────────┘  │    │  └────────┘  │    │  └────────┘  │
│  ┌────────┐  │    │  ┌────────┐  │    │  ┌────────┐  │
│  │ Bloom  │  │    │  │ Bloom  │  │    │  │ Bloom  │  │
│  │Filters │  │    │  │Filters │  │    │  │Filters │  │
│  └────────┘  │    │  └────────┘  │    │  └────────┘  │
└──────────────┘    └──────────────┘    └──────────────┘

Background Processes (on every node):
  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐
  │   Gossip     │  │  Anti-Entropy│  │  Compaction   │
  │  Protocol    │  │  (Merkle     │  │  (SSTable     │
  │  (failure    │  │   trees)     │  │   merge)      │
  │  detection)  │  │              │  │              │
  └──────────────┘  └──────────────┘  └──────────────┘
  ┌──────────────┐  ┌──────────────┐
  │   Hinted     │  │  Membership  │
  │   Handoff    │  │  Management  │
  │  (temp fail  │  │  (ring       │
  │   recovery)  │  │   changes)   │
  └──────────────┘  └──────────────┘

Failure Scenarios & Handling

Scenario 1: Single Node Failure (Transient)

Problem: Node N2 crashes and restarts after 5 minutes.
Solution:
  1. Gossip protocol detects N2 is unresponsive within ~30 seconds.
  2. Writes to N2's keys use sloppy quorum → sent to N4 (stand-in).
  3. N4 stores data with hinted handoff metadata.
  4. N2 restarts, gossip detects it's alive.
  5. N4 forwards hinted data to N2.
  6. N2 is fully caught up. No data lost.

Scenario 2: Permanent Node Failure

Problem: Node N2's disk dies. N2 is replaced with a new empty node.
Solution:
  1. New N2 joins the ring at N2's position (or gets reassigned vnodes).
  2. Anti-entropy process kicks in:
     a. New N2 builds empty Merkle trees.
     b. Compares with N1 and N3 → massive divergence.
     c. Streams all data for N2's key ranges from N1 and N3.
  3. Full data transfer may take hours for large datasets.
  4. During transfer, sloppy quorum ensures availability.
  5. Once synchronized, N2 is fully operational.

Scenario 3: Network Partition

Problem: Network partition splits cluster into two groups:
  Group A: [N1, N2]    Group B: [N3, N4, N5]

For a key with preference list [N1, N3, N5]:
  - Group A can reach N1 but not N3 or N5
  - Group B can reach N3 and N5 but not N1

With sloppy quorum:
  - Group A: write to N1 + N2 (stand-in for N3) → W=2 satisfied ✓
  - Group B: write to N3 + N5 → W=2 satisfied ✓
  → Both sides can write! But we now have CONFLICTING versions.

After partition heals:
  - Anti-entropy detects divergence via Merkle trees.
  - Vector clocks identify concurrent writes.
  - Conflict resolution kicks in (client-side merge or LWW).
  → Data converges to a single consistent state.

Scenario 4: Datacenter Failure

Problem: Entire datacenter (AZ) goes offline.
Solution:
  - Cross-AZ replication ensures N replicas span multiple AZs.
  - Remaining AZs have enough replicas for quorum (if N ≥ 3 and AZs ≥ 3).
  - Sloppy quorum uses healthy nodes in other AZs.
  - When AZ recovers, hinted handoff + anti-entropy repair data.
  - DynamoDB: 3 replicas across 3 AZs in a region by default.
Failure TypeDetectionAvailability MechanismRepair Mechanism
Transient node failureGossip (heartbeat timeout)Sloppy quorumHinted handoff
Permanent node failureGossip (extended timeout)Sloppy quorumMerkle tree anti-entropy
Network partitionGossip (asymmetric detection)Sloppy quorum both sidesVector clock conflict resolution
Datacenter failureCross-DC gossipCross-AZ replicationHinted handoff + anti-entropy

Performance Optimizations

Write Path Optimizations

Read Path Optimizations

Compaction Strategies

StrategyDescriptionBest For
Size-tiered (STCS)Merge SSTables of similar size. Write-optimized. Higher space amplification.Write-heavy workloads
Leveled (LCS)Organize SSTables into levels. Each level is 10× the previous. Read-optimized. Lower space amplification.Read-heavy workloads
FIFODelete oldest SSTables. For time-series data with TTL.Time-series / TTL data

Real-World Systems Comparison

FeatureDynamoDBCassandraRiak
PartitioningConsistent hashing + adaptive capacityConsistent hashing + vnodesConsistent hashing + vnodes
Replication3 replicas across 3 AZsConfigurable N (default 3)Configurable N (default 3)
ConsistencyEventual (default) + strong (optional)Tunable (ANY to ALL)Tunable (N, R, W)
Conflict resolutionLWW (default) + conditional writesLWW (timestamps)Vector clocks + siblings
Failure detectionGossip + external monitoringGossip (Phi accrual detector)Gossip
Anti-entropyProprietary (continuous)Merkle trees (nodetool repair)Merkle trees (active anti-entropy)
Write pathWAL + B-tree (not LSM)WAL + MemTable + SSTables (LSM)WAL + LSM (Bitcask/LevelDB)
Managed?Fully managed (AWS)Self-managed or DataStax AstraSelf-managed
DynamoDB's evolution: The original Dynamo paper (2007) used vector clocks for conflict detection. Modern DynamoDB has largely moved to Last-Write-Wins with conditional updates (ConditionExpression) for conflict-free writes. This simplifies the client code at the cost of flexibility. For shopping-cart-like use cases, DynamoDB now recommends using transactions (TransactWriteItems) instead of vector clock merges.

Summary & Interview Cheat Sheet

ComponentTechniquePurpose
Data partitioningConsistent hashing + virtual nodesEven distribution, minimal remapping
Data replicationN replicas on clockwise ring nodesHigh availability, durability
ConsistencyQuorum consensus (W + R > N)Tunable consistency per request
Conflict detectionVector clocksDetect concurrent writes without data loss
Conflict resolutionClient-side merge / LWW / CRDTsReconcile divergent replicas
Failure detectionGossip protocolDecentralized, scalable liveness checks
Temporary failuresSloppy quorum + hinted handoff"Always writable" — no downtime
Permanent failuresMerkle tree anti-entropyEfficient data repair between replicas
Storage engineLSM tree (WAL + MemTable + SSTables)High write throughput, crash recovery
Read optimizationBloom filters + block cacheReduce unnecessary disk reads

Key Interview Tips

✅ Do

  • Start with requirements and scale estimation
  • Begin with single-server design, then distribute
  • Explain the CAP trade-off and justify choosing AP
  • Draw the hash ring and show replication
  • Explain quorum math: W + R > N
  • Discuss failure handling (the core of this design)
  • Mention specific numbers: N=3, W=2, R=2

❌ Don't

  • Jump to distributed design without single-server first
  • Forget to explain virtual nodes with consistent hashing
  • Hand-wave on conflict resolution — it's critical
  • Ignore failure scenarios (interviewers will ask)
  • Confuse sloppy quorum with strict quorum
  • Skip Merkle trees — they differentiate strong answers
  • Say "just use DynamoDB" without explaining how it works