Design: Distributed Key-Value Store
Introduction
A distributed key-value (KV) store is one of the most fundamental and frequently asked system design interview topics. It underlies nearly every large-scale system: session stores, caching layers, configuration management, metadata services, and even primary databases. Systems like Amazon DynamoDB, Apache Cassandra, Riak, and Voldemort are all implementations of distributed KV stores.
This post is deeply rooted in the Amazon Dynamo paper (2007), which introduced many of the techniques we'll cover. If you can explain the design of a Dynamo-style KV store end-to-end, you can ace virtually any system design interview question on distributed storage.
What is a Key-Value Store?
A key-value store is a non-relational database where data is stored as a collection of key-value pairs. The key is a unique identifier, and the value is the data associated with that key. Think of it as a giant hash map, but distributed across multiple machines.
put(key, value) → Store the value associated with the key
get(key) → Retrieve the value associated with the key
delete(key) → Remove the key-value pair
Values can be anything: a string, a JSON blob, a serialized object, or even a binary blob. The store treats values as opaque — it doesn't inspect or index the contents.
Requirements
Functional Requirements
put(key, value)— Store a key-value pair. If the key already exists, update the value.get(key)— Retrieve the value for a given key. Return null/error if the key doesn't exist.delete(key)— Remove a key-value pair from the store.
Non-Functional Requirements
| Requirement | Target | Why It Matters |
|---|---|---|
| High Availability | 99.99% uptime | Shopping cart, session store — must always be writable |
| Scalability | Horizontal, 10s–1000s of nodes | Data grows unboundedly; add nodes without downtime |
| Partition Tolerance | Must survive network partitions | CAP theorem — partitions are inevitable in distributed systems |
| Tunable Consistency | Eventual → strong (configurable) | Different use cases need different consistency levels |
| Low Latency | p99 < 10ms for reads and writes | Real-time applications demand sub-10ms response times |
| Durability | No data loss after acknowledged writes | Data must survive disk failures and node crashes |
Scale Assumptions
- Key size: up to 256 bytes
- Value size: up to 1 MB (optimized for small objects < 10 KB)
- Data volume: petabytes across thousands of nodes
- Throughput: millions of operations per second
- Latency SLA: 99.9th percentile < 10ms
Single-Server Key-Value Store
Before distributing anything, let's understand how a single-server KV store works. This is the building block.
In-Memory Hash Table
The simplest approach: keep everything in a hash table in memory.
class SingleServerKVStore:
def __init__(self):
self.store = {} # hash map: key → value
def put(self, key: str, value: bytes):
self.store[key] = value
def get(self, key: str) -> bytes:
return self.store.get(key, None)
def delete(self, key: str):
self.store.pop(key, None)
Pros: O(1) reads and writes, dead simple. Cons: Data lost on crash, limited by single machine's RAM.
Durability: Write-Ahead Log (WAL)
To survive crashes, we add a Write-Ahead Log (WAL) — an append-only file on disk. Before modifying the in-memory hash table, we write the operation to the WAL.
class DurableKVStore:
def __init__(self, wal_path: str):
self.store = {}
self.wal = open(wal_path, 'a+b')
self._recover() # replay WAL on startup
def put(self, key: str, value: bytes):
# 1. Write to WAL first (fsync for durability)
entry = f"PUT {key} {value}\n"
self.wal.write(entry.encode())
self.wal.flush()
os.fsync(self.wal.fileno())
# 2. Then update in-memory hash table
self.store[key] = value
def _recover(self):
"""Replay WAL to rebuild in-memory state after crash"""
self.wal.seek(0)
for line in self.wal:
parts = line.decode().strip().split(' ', 2)
if parts[0] == 'PUT':
self.store[parts[1]] = parts[2]
elif parts[0] == 'DEL':
self.store.pop(parts[1], None)
Handling Memory Limits: LSM Trees + SSTables
When data exceeds RAM, we use an LSM Tree (Log-Structured Merge Tree):
- MemTable — in-memory sorted tree (red-black tree or skip list). Writes go here first.
- When the MemTable exceeds a threshold (e.g., 64 MB), it's flushed to disk as an SSTable (Sorted String Table) — an immutable, sorted file.
- Reads check the MemTable first, then SSTables from newest to oldest.
- Compaction periodically merges SSTables to reclaim space and reduce read amplification.
Write Path: Client → WAL (append) → MemTable (sorted insert)
↓ (when full)
Flush to SSTable on disk
Read Path: Client → MemTable → SSTable₁ → SSTable₂ → ... → SSTableₙ
(newest first, stop on first match)
Compaction: SSTable₁ + SSTable₂ → Merged SSTable (remove duplicates/tombstones)
A single-server store can handle ~100K–1M ops/sec depending on hardware. But for our requirements (millions of ops/sec, petabytes of data, high availability), we need to distribute it across many machines.
Data Partitioning: Consistent Hashing
The fundamental question in a distributed KV store: given a key, which server stores it? We need a partitioning strategy that:
- Distributes data evenly across nodes
- Minimizes data movement when nodes are added/removed
- Doesn't require a central authority
Why Not Simple Hashing?
The naive approach server = hash(key) % N is catastrophic for distributed systems:
# With 4 servers: hash(key) % 4
# Key "user:123" → hash = 9 → server 1
# Key "user:456" → hash = 7 → server 3
# Now add server 5: hash(key) % 5
# Key "user:123" → hash = 9 → server 4 ← MOVED!
# Key "user:456" → hash = 7 → server 2 ← MOVED!
# Result: ~80% of keys need to be remapped (N-1/N)
# This causes a massive cache miss storm ("thundering herd")
Consistent Hashing
Consistent hashing maps both keys and servers onto a circular hash space (the "hash ring"). A key is stored on the first server encountered clockwise from the key's position on the ring.
When a node joins or leaves, only the keys between the new/departed node and its predecessor need to be remapped. This is O(K/N) keys (where K = total keys, N = total nodes), compared to O(K) for naive hashing.
Virtual Nodes (vnodes)
With physical nodes alone, data distribution is uneven — some nodes get more keys than others, especially with few nodes. Virtual nodes solve this by mapping each physical node to multiple positions on the ring.
# Without vnodes: 3 physical nodes, uneven distribution
Ring: ────N1──────────────────N2────N3────────────
↑ owns ~60% of ring ↑ owns ~10%
# With vnodes: each physical node gets ~150 virtual nodes
Ring: ─N1a─N2c─N3a─N1b─N3b─N2a─N1c─N2b─N3c─N1d─...
Much more uniform distribution!
| Aspect | Without vnodes | With vnodes (~150 per node) |
|---|---|---|
| Data distribution | Highly uneven | Near-uniform (std dev < 5%) |
| Node addition/removal | One node gets all the load | Load spreads across many nodes |
| Heterogeneous hardware | Can't account for it | More vnodes for more powerful nodes |
| Metadata overhead | Minimal | Moderate (ring table is larger) |
▶ Consistent Hash Ring with Replication
Watch keys map to nodes on the ring, get replicated clockwise, and see how write/read paths work.
Data Replication
To achieve high availability and durability, each key is replicated to N nodes (typically N=3). The key's coordinator (the node it maps to on the ring) replicates the data to the next N-1 nodes clockwise on the ring.
Replication Factor N = 3
Hash Ring: ... → N1 → N2 → N3 → N4 → N5 → ...
Key "user:42" maps to N1 (coordinator)
→ Replica 1: N1 (coordinator)
→ Replica 2: N2 (next clockwise)
→ Replica 3: N3 (next clockwise after N2)
These 3 nodes form the "preference list" for this key.
Preference List
The preference list is the ordered list of nodes responsible for storing a particular key. With virtual nodes, we skip vnodes belonging to the same physical node:
def get_preference_list(key, ring, N=3):
"""Return N distinct physical nodes for this key"""
hash_val = hash(key)
start = find_first_node_clockwise(ring, hash_val)
preference_list = []
seen_physical = set()
node = start
while len(preference_list) < N:
physical = node.physical_node
if physical not in seen_physical:
preference_list.append(node)
seen_physical.add(physical)
node = next_clockwise(ring, node)
return preference_list
Replication Strategy
Replicas can be placed using different strategies:
- Same datacenter — Lower latency but no protection against datacenter failures.
- Cross-datacenter (rack-aware) — Each replica in a different rack/AZ. Survives rack/AZ failures. DynamoDB and Cassandra use this.
- Cross-region — Replicas in different geographic regions. Highest durability but highest latency. Used for critical data.
Quorum Consensus
With data replicated across N nodes, how do we ensure consistency? The answer is quorum-based reads and writes.
The Parameters: N, W, R
| Parameter | Meaning | Typical Value |
|---|---|---|
| N | Total number of replicas | 3 |
| W | Write quorum — number of replicas that must acknowledge a write | 2 |
| R | Read quorum — number of replicas that must respond to a read | 2 |
The Quorum Condition
Tunable Consistency
The beauty of quorum-based systems is that consistency is tunable per-request:
| Configuration | W | R | W+R>N? | Trade-off |
|---|---|---|---|---|
| Strong consistency | 2 | 2 | 4 > 3 ✓ | Every read returns the latest write. Higher latency. |
| Write-heavy | 1 | 3 | 4 > 3 ✓ | Fast writes (only 1 ACK needed), slow reads (must read all 3). Good for write-heavy workloads. |
| Read-heavy | 3 | 1 | 4 > 3 ✓ | Slow writes (all 3 must ACK), fast reads (any 1 node). Good for read-heavy workloads. |
| Eventual consistency | 1 | 1 | 2 > 3 ✗ | Fastest reads and writes. Stale reads possible. Fine for caching/session data. |
| Strong write durability | 3 | 3 | 6 > 3 ✓ | Maximum safety — all replicas must agree on both reads and writes. Slowest but safest. |
ConsistentRead=true which bumps R to a majority quorum. Writes always require W=2 for durability. This gives developers a per-request knob to trade consistency for latency.
▶ Quorum Read/Write
See how W=2 writes and R=2 reads overlap to guarantee the latest value is always seen (N=3).
Conflict Resolution: Vector Clocks
In an eventually consistent system, concurrent writes to the same key can happen on different replicas. We need a mechanism to detect and resolve these conflicts.
Why Timestamps Aren't Enough
Last-Write-Wins (LWW) using wall-clock timestamps seems simple but is fundamentally flawed:
- Clock skew — Different machines have slightly different clocks. Even with NTP, clocks can drift by milliseconds.
- Data loss — LWW silently discards one of the conflicting writes. If two users both update a shopping cart at the same time, one user's items disappear.
- Causality lost — Timestamps can't distinguish "concurrent" from "happened-before".
Vector Clocks
A vector clock is a list of (node, counter) pairs that tracks the causal history of a value. Each node increments its own counter when it writes.
Vector Clock: D([S1, v1], [S2, v2], ..., [Sn, vn])
Rules:
- When node Si writes, increment vi
- When node Si receives a write from Sj, merge: take max of each counter
Comparison:
- VC₁ < VC₂ (happened-before): every counter in VC₁ ≤ VC₂, at least one strictly less
- VC₁ ∥ VC₂ (concurrent): neither VC₁ < VC₂ nor VC₂ < VC₁
Example: Shopping Cart Conflict
Step 1: Client writes via node S1
Value: {eggs}
Clock: D([S1, 1])
Step 2: Client reads D([S1, 1]), writes via S1
Value: {eggs, milk}
Clock: D([S1, 2])
Step 3: Two concurrent updates (network partition between clients):
Client A reads D([S1, 2]), writes via S2:
Value: {eggs, milk, cheese}
Clock: D([S1, 2], [S2, 1])
Client B reads D([S1, 2]), writes via S3:
Value: {eggs, butter}
Clock: D([S1, 2], [S3, 1])
Step 4: Client reads both versions:
D([S1, 2], [S2, 1]) → {eggs, milk, cheese}
D([S1, 2], [S3, 1]) → {eggs, butter}
These are CONCURRENT (neither dominates the other).
Conflict! Client must resolve:
→ Merge: {eggs, milk, cheese, butter}
→ Write via S1: D([S1, 3], [S2, 1], [S3, 1])
Conflict Resolution Strategies
| Strategy | How It Works | Used By |
|---|---|---|
| Client-side resolution | Client receives all conflicting versions (siblings) and merges them. Most flexible but pushes complexity to the client. | Riak, DynamoDB (optional) |
| Last-Write-Wins (LWW) | Use timestamps; last write wins. Simple but loses data. | Cassandra (default), DynamoDB (default) |
| Application-specific merge | Custom merge function (e.g., union for sets, max for counters). Requires domain knowledge. | Riak CRDTs |
| CRDTs | Conflict-free Replicated Data Types: data structures that can be merged automatically without conflicts (G-Counter, OR-Set, etc.). | Riak, Redis (CRDT module) |
Write and Read Paths
Write Path
Client → Any Node (Coordinator) → Replicate to N nodes
Detailed steps:
1. Client sends put(key, value) to ANY node in the cluster.
(All nodes are equal — no master!)
2. Receiving node becomes the "coordinator" for this request.
It uses consistent hashing to identify the N replicas.
3. Coordinator sends the write to ALL N replicas in parallel.
4. Coordinator waits for W acknowledgments.
(W=2 for N=3 typically)
5. Once W ACKs received → respond SUCCESS to client.
The remaining N-W replicas will receive the write eventually.
6. Each replica's local write path:
a. Write to commit log / WAL (append-only, for durability)
b. Write to MemTable (in-memory sorted structure)
c. When MemTable is full → flush to SSTable on disk
d. Acknowledge the coordinator
Read Path
Client → Any Node (Coordinator) → Read from R replicas → Resolve → Return
Detailed steps:
1. Client sends get(key) to ANY node.
2. Coordinator identifies the N replicas using consistent hashing.
3. Coordinator sends read requests to ALL N replicas in parallel.
4. Coordinator waits for R responses.
5. If all R responses have the same value:
→ Return the value to the client.
6. If responses differ (stale replicas):
→ Use vector clocks to determine the latest version.
→ If concurrent versions exist → return all siblings to client.
→ Trigger "read repair": send the latest value to stale replicas.
7. Each replica's local read path:
a. Check MemTable first (in-memory)
b. If not found → check Bloom filter for each SSTable
c. If Bloom filter says "maybe" → check SSTable on disk
d. Return the value (or not-found)
Read Repair
Read repair is an opportunistic consistency mechanism. During a read, if the coordinator detects that some replicas have stale data, it sends the latest version to those replicas. This gradually repairs inconsistencies without any background process.
Read Repair Example:
Key "user:42" has value v5 on N1, N2 but v3 (stale) on N3.
1. Coordinator reads from N1 (v5), N2 (v5), N3 (v3)
2. Coordinator sees N3 is stale
3. Returns v5 to client
4. Asynchronously sends v5 to N3 → N3 updates to v5
Result: Next read from N3 will return the correct value.
Handling Temporary Failures
Nodes go down temporarily all the time: network blips, garbage collection pauses, rolling deployments. The system must remain available during these transient failures.
Sloppy Quorum
In a strict quorum, writes and reads must involve the exact nodes in the preference list. If one of those nodes is down, the operation fails. A sloppy quorum relaxes this: if a node in the preference list is unreachable, the request is sent to the next healthy node on the ring (not in the original preference list).
Strict Quorum (preference list: [N1, N2, N3]):
N2 is down → write to N1, N3 → only 2 nodes → W=2 satisfied ✓
But what if N2 AND N3 are down?
Strict: write fails! Only N1 is available.
Sloppy: write to N1, N4, N5 (next healthy nodes on ring) → W=2 satisfied ✓
Hinted Handoff
When a sloppy quorum sends data to a stand-in node (e.g., N4 holds data meant for N2), that node stores a hint — metadata indicating the intended recipient. When N2 comes back online, N4 forwards the data to N2 and deletes its local copy.
Hinted Handoff Flow:
1. N2 goes down.
2. Write for key K (preference list: [N1, N2, N3]):
→ Coordinator sends to N1, N4 (stand-in for N2), N3
→ N4 stores: { key: K, value: V, hint: "intended for N2" }
3. N2 comes back online.
4. N4 detects N2 is alive (via gossip protocol).
5. N4 sends the hinted data to N2.
6. N2 stores the data normally.
7. N4 deletes the hint.
Result: No data loss, no unavailability, eventual consistency maintained.
Handling Permanent Failures: Merkle Trees
Hinted handoff handles temporary failures. But what about permanent failures — a disk dies, a node is decommissioned, or data gets corrupted? We need a way to detect and repair long-term data divergence between replicas.
The Problem
Comparing all keys between two replicas is expensive. If each replica has billions of keys, sending all keys over the network is impractical. We need a way to efficiently identify exactly which keys differ.
Anti-Entropy with Merkle Trees
A Merkle tree (hash tree) is a binary tree where each leaf contains the hash of a data block, and each internal node contains the hash of its children. This allows efficient comparison:
Merkle Tree Structure:
Root Hash
/ \
Hash(L+R) Hash(L+R)
/ \ / \
Hash(K1) Hash(K2) Hash(K3) Hash(K4)
| | | |
K1:V1 K2:V2 K3:V3 K4:V4
Key range is divided into buckets.
Each leaf = hash of all key-value pairs in that bucket.
Each parent = hash(left_child || right_child).
Comparison Algorithm
- Compare roots: If root hashes match → replicas are identical. Done! (O(1))
- Roots differ: Compare left and right children.
- Recursively descend into subtrees with different hashes.
- At the leaves: We've identified the exact key ranges that differ.
- Sync only the divergent ranges.
Merkle Tree Anti-Entropy Process
Anti-Entropy Repair:
1. Each node maintains a Merkle tree per key range it owns.
2. Periodically (e.g., every 10 minutes), a node picks a random
replica and initiates a Merkle tree comparison.
3. Exchange root hashes:
Node A root: 0xABCD
Node B root: 0xABCF ← different!
4. Exchange children hashes:
A.left = 0x1234, B.left = 0x1234 ← same (skip)
A.right = 0x5678, B.right = 0x5679 ← different (descend)
5. Continue descending until divergent leaves are found.
6. Node with stale data requests the correct key-value pairs.
7. Trees are rebuilt when data changes (incrementally or periodically).
nodetool repair. During repair, it builds Merkle trees by scanning all SSTables, compares them between replicas, and streams only the differences. This is critical after replacing a dead node or recovering from long outages.
▶ Merkle Tree Anti-Entropy
Two replicas compare Merkle tree hashes level by level, narrowing down to the exact divergent data range.
Failure Detection: Gossip Protocol
In a decentralized system with no master, how does a node know which other nodes are alive or dead? The answer is the gossip protocol, an epidemic-style communication mechanism.
How Gossip Works
- Each node maintains a membership list: a table of all known nodes and their heartbeat counters.
- Periodically (e.g., every 1 second), each node increments its own heartbeat counter.
- Each node randomly selects another node and sends its entire membership list.
- The receiving node merges the lists: for each node, keep the higher heartbeat counter.
- If a node's heartbeat hasn't been updated for a threshold period (e.g., 30 seconds), it's marked as suspected failed.
- After another timeout (e.g., 60 seconds without recovery), it's marked as permanently failed and removed.
Gossip Protocol Example:
Time 0: Node A membership list:
A: heartbeat=100, timestamp=T0
B: heartbeat=95, timestamp=T0
C: heartbeat=80, timestamp=T0-5s
D: heartbeat=70, timestamp=T0-40s ← suspected failed!
Time 1: A randomly picks B and sends its list.
B merges: keeps higher heartbeat for each node.
B now knows D might be failed too.
Time 2: B gossips to C with merged list.
C now also knows about D's potential failure.
→ Within O(log N) gossip rounds, ALL nodes know D is suspected failed.
(This is the mathematical beauty of epidemic protocols.)
Failure Detection States
State Machine:
ALIVE → SUSPECTED → FAILED → REMOVED
(no heartbeat (timeout (cleanup)
for T1 sec) for T2 sec)
Advantages of Gossip
- Decentralized: No single point of failure. No master election needed.
- Scalable: Each node only communicates with a few random peers per round. Network overhead is O(N) per round across the entire cluster.
- Convergent: Information spreads exponentially. With N nodes, full propagation takes O(log N) rounds.
- Robust: Tolerates message loss, node failures, and network partitions gracefully.
Coordinator and Request Routing
In a Dynamo-style system, any node can serve any request. There's no master/slave distinction. This is fundamental to the availability guarantee.
Request Flow
Two routing strategies:
1. Client-side routing (used by DynamoDB):
Client knows the ring topology (via partition map).
Client sends request directly to the coordinator.
→ Lowest latency (1 hop to coordinator → N-1 hops to replicas)
2. Generic load balancer (simpler):
Client → Load Balancer → Random Node → Forward to Coordinator
→ Extra hop but client doesn't need ring knowledge
Coordinator responsibilities:
- Identify the N replicas for the key
- Send read/write to all N replicas in parallel
- Wait for W/R responses
- Handle conflict resolution (for reads)
- Trigger read repair (for stale replicas)
- Return result to client
Request Routing with Partition Map
# Partition map: every node knows which ranges map to which nodes
partition_map = {
(0, 1000): ['N1', 'N2', 'N3'],
(1001, 2000): ['N2', 'N3', 'N4'],
(2001, 3000): ['N3', 'N4', 'N5'],
...
}
def route_request(key):
token = hash(key) % RING_SIZE
for (start, end), nodes in partition_map.items():
if start <= token <= end:
coordinator = nodes[0] # first in preference list
return coordinator
Complete Architecture
Let's bring all the components together:
┌────────────────────────────────────────────────────────────────────┐
│ CLIENT LAYER │
│ Client → (Partition Map) → Choose Coordinator Node │
└──────────────────────────────┬─────────────────────────────────────┘
│
┌──────────────────────────────▼─────────────────────────────────────┐
│ COORDINATOR NODE │
│ ┌───────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ Request Router │ │ Quorum Logic │ │ Conflict Resolution │ │
│ │ (hash ring │ │ (W, R params)│ │ (vector clocks, │ │
│ │ + pref list) │ │ │ │ read repair) │ │
│ └───────────────┘ └──────────────┘ └──────────────────────┘ │
└──────────────────────────────┬─────────────────────────────────────┘
│ Replicate to N nodes
┌──────────────────────┼──────────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ REPLICA 1 │ │ REPLICA 2 │ │ REPLICA 3 │
│ ┌────────┐ │ │ ┌────────┐ │ │ ┌────────┐ │
│ │ WAL │ │ │ │ WAL │ │ │ │ WAL │ │
│ └────────┘ │ │ └────────┘ │ │ └────────┘ │
│ ┌────────┐ │ │ ┌────────┐ │ │ ┌────────┐ │
│ │MemTable│ │ │ │MemTable│ │ │ │MemTable│ │
│ └────────┘ │ │ └────────┘ │ │ └────────┘ │
│ ┌────────┐ │ │ ┌────────┐ │ │ ┌────────┐ │
│ │SSTables│ │ │ │SSTables│ │ │ │SSTables│ │
│ └────────┘ │ │ └────────┘ │ │ └────────┘ │
│ ┌────────┐ │ │ ┌────────┐ │ │ ┌────────┐ │
│ │ Bloom │ │ │ │ Bloom │ │ │ │ Bloom │ │
│ │Filters │ │ │ │Filters │ │ │ │Filters │ │
│ └────────┘ │ │ └────────┘ │ │ └────────┘ │
└──────────────┘ └──────────────┘ └──────────────┘
Background Processes (on every node):
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Gossip │ │ Anti-Entropy│ │ Compaction │
│ Protocol │ │ (Merkle │ │ (SSTable │
│ (failure │ │ trees) │ │ merge) │
│ detection) │ │ │ │ │
└──────────────┘ └──────────────┘ └──────────────┘
┌──────────────┐ ┌──────────────┐
│ Hinted │ │ Membership │
│ Handoff │ │ Management │
│ (temp fail │ │ (ring │
│ recovery) │ │ changes) │
└──────────────┘ └──────────────┘
Failure Scenarios & Handling
Scenario 1: Single Node Failure (Transient)
Problem: Node N2 crashes and restarts after 5 minutes.
Solution:
1. Gossip protocol detects N2 is unresponsive within ~30 seconds.
2. Writes to N2's keys use sloppy quorum → sent to N4 (stand-in).
3. N4 stores data with hinted handoff metadata.
4. N2 restarts, gossip detects it's alive.
5. N4 forwards hinted data to N2.
6. N2 is fully caught up. No data lost.
Scenario 2: Permanent Node Failure
Problem: Node N2's disk dies. N2 is replaced with a new empty node.
Solution:
1. New N2 joins the ring at N2's position (or gets reassigned vnodes).
2. Anti-entropy process kicks in:
a. New N2 builds empty Merkle trees.
b. Compares with N1 and N3 → massive divergence.
c. Streams all data for N2's key ranges from N1 and N3.
3. Full data transfer may take hours for large datasets.
4. During transfer, sloppy quorum ensures availability.
5. Once synchronized, N2 is fully operational.
Scenario 3: Network Partition
Problem: Network partition splits cluster into two groups:
Group A: [N1, N2] Group B: [N3, N4, N5]
For a key with preference list [N1, N3, N5]:
- Group A can reach N1 but not N3 or N5
- Group B can reach N3 and N5 but not N1
With sloppy quorum:
- Group A: write to N1 + N2 (stand-in for N3) → W=2 satisfied ✓
- Group B: write to N3 + N5 → W=2 satisfied ✓
→ Both sides can write! But we now have CONFLICTING versions.
After partition heals:
- Anti-entropy detects divergence via Merkle trees.
- Vector clocks identify concurrent writes.
- Conflict resolution kicks in (client-side merge or LWW).
→ Data converges to a single consistent state.
Scenario 4: Datacenter Failure
Problem: Entire datacenter (AZ) goes offline.
Solution:
- Cross-AZ replication ensures N replicas span multiple AZs.
- Remaining AZs have enough replicas for quorum (if N ≥ 3 and AZs ≥ 3).
- Sloppy quorum uses healthy nodes in other AZs.
- When AZ recovers, hinted handoff + anti-entropy repair data.
- DynamoDB: 3 replicas across 3 AZs in a region by default.
| Failure Type | Detection | Availability Mechanism | Repair Mechanism |
|---|---|---|---|
| Transient node failure | Gossip (heartbeat timeout) | Sloppy quorum | Hinted handoff |
| Permanent node failure | Gossip (extended timeout) | Sloppy quorum | Merkle tree anti-entropy |
| Network partition | Gossip (asymmetric detection) | Sloppy quorum both sides | Vector clock conflict resolution |
| Datacenter failure | Cross-DC gossip | Cross-AZ replication | Hinted handoff + anti-entropy |
Performance Optimizations
Write Path Optimizations
- Batched WAL writes: Group multiple writes into a single fsync call. Amortizes the expensive disk sync across many operations. DynamoDB batches writes in 4 KB pages.
- Bloom filters on SSTables: Before reading an SSTable from disk, check the Bloom filter (O(1) memory). False positive rate of ~1% with 10 bits per key.
- Compression: SSTables are compressed (LZ4, Snappy, or Zstd). Reduces disk I/O at the cost of CPU. Net positive because disk is usually the bottleneck.
- Parallel replication: Coordinator sends writes to all N replicas simultaneously, not sequentially. Latency = max(latency of W fastest replicas).
Read Path Optimizations
- Block cache: Cache frequently-read SSTable blocks in memory. Serves hot data without disk I/O.
- Row cache: Cache entire rows for extremely hot keys (e.g., celebrity profiles).
- Speculative reads: If the two fastest replicas haven't responded within a threshold (e.g., p99 latency), send a third request to the slowest replica. This reduces tail latency at the cost of slightly more network traffic.
- Read repair probability: Don't trigger read repair on every read (expensive). Use a probability (e.g., 10%) to reduce overhead while still eventually repairing stale data.
Compaction Strategies
| Strategy | Description | Best For |
|---|---|---|
| Size-tiered (STCS) | Merge SSTables of similar size. Write-optimized. Higher space amplification. | Write-heavy workloads |
| Leveled (LCS) | Organize SSTables into levels. Each level is 10× the previous. Read-optimized. Lower space amplification. | Read-heavy workloads |
| FIFO | Delete oldest SSTables. For time-series data with TTL. | Time-series / TTL data |
Real-World Systems Comparison
| Feature | DynamoDB | Cassandra | Riak |
|---|---|---|---|
| Partitioning | Consistent hashing + adaptive capacity | Consistent hashing + vnodes | Consistent hashing + vnodes |
| Replication | 3 replicas across 3 AZs | Configurable N (default 3) | Configurable N (default 3) |
| Consistency | Eventual (default) + strong (optional) | Tunable (ANY to ALL) | Tunable (N, R, W) |
| Conflict resolution | LWW (default) + conditional writes | LWW (timestamps) | Vector clocks + siblings |
| Failure detection | Gossip + external monitoring | Gossip (Phi accrual detector) | Gossip |
| Anti-entropy | Proprietary (continuous) | Merkle trees (nodetool repair) | Merkle trees (active anti-entropy) |
| Write path | WAL + B-tree (not LSM) | WAL + MemTable + SSTables (LSM) | WAL + LSM (Bitcask/LevelDB) |
| Managed? | Fully managed (AWS) | Self-managed or DataStax Astra | Self-managed |
ConditionExpression) for conflict-free writes. This simplifies the client code at the cost of flexibility. For shopping-cart-like use cases, DynamoDB now recommends using transactions (TransactWriteItems) instead of vector clock merges.
Summary & Interview Cheat Sheet
| Component | Technique | Purpose |
|---|---|---|
| Data partitioning | Consistent hashing + virtual nodes | Even distribution, minimal remapping |
| Data replication | N replicas on clockwise ring nodes | High availability, durability |
| Consistency | Quorum consensus (W + R > N) | Tunable consistency per request |
| Conflict detection | Vector clocks | Detect concurrent writes without data loss |
| Conflict resolution | Client-side merge / LWW / CRDTs | Reconcile divergent replicas |
| Failure detection | Gossip protocol | Decentralized, scalable liveness checks |
| Temporary failures | Sloppy quorum + hinted handoff | "Always writable" — no downtime |
| Permanent failures | Merkle tree anti-entropy | Efficient data repair between replicas |
| Storage engine | LSM tree (WAL + MemTable + SSTables) | High write throughput, crash recovery |
| Read optimization | Bloom filters + block cache | Reduce unnecessary disk reads |
Key Interview Tips
✅ Do
- Start with requirements and scale estimation
- Begin with single-server design, then distribute
- Explain the CAP trade-off and justify choosing AP
- Draw the hash ring and show replication
- Explain quorum math: W + R > N
- Discuss failure handling (the core of this design)
- Mention specific numbers: N=3, W=2, R=2
❌ Don't
- Jump to distributed design without single-server first
- Forget to explain virtual nodes with consistent hashing
- Hand-wave on conflict resolution — it's critical
- Ignore failure scenarios (interviewers will ask)
- Confuse sloppy quorum with strict quorum
- Skip Merkle trees — they differentiate strong answers
- Say "just use DynamoDB" without explaining how it works