← All Posts
High Level Design Series · Building Blocks · Part 2· Post 12 of 70

Database Sharding & Partitioning

Why Shard?

Every database has limits. A single PostgreSQL instance can comfortably handle a few terabytes of data and a few thousand connections. Beyond that, you hit walls:

BottleneckSymptomScale Limit
StorageDisk full, backups take hours~2–10 TB practical limit
Write throughputWAL bottleneck, lock contention, replication lag~50K–100K writes/sec
Connection limitsConnection pool exhaustion, "too many clients"~5K–10K connections
Query performanceTable scans on billions of rowsIndexes grow too large for RAM
AvailabilitySingle point of failureOne region, one machine

Vertical scaling (bigger machine) can only take you so far. At some point you need horizontal scaling: splitting data across multiple database instances, each handling a subset of the load. This is sharding.

When is sharding premature? Sharding adds enormous complexity — cross-shard queries, distributed transactions, operational burden. Exhaust these first:
  • Read replicas — offload read traffic
  • Caching (Redis/Memcached) — reduce DB hits
  • Connection pooling (PgBouncer) — handle more connections
  • Query optimization — proper indexes, query rewriting
  • Vertical scaling — bigger instance (cheapest engineering time)
Sharding is a last resort, not a first choice. But when you need it, you really need it.

Horizontal vs Vertical Partitioning

These are fundamentally different approaches to splitting data:

Horizontal Partitioning (Sharding)

Split rows across multiple databases. Each shard has the same schema but holds a different subset of rows.

-- All shards have identical schema:
CREATE TABLE orders (
    order_id   BIGINT PRIMARY KEY,
    user_id    BIGINT NOT NULL,
    product_id BIGINT NOT NULL,
    amount     DECIMAL(10,2),
    created_at TIMESTAMP DEFAULT NOW()
);

-- Shard 0: orders where user_id % 4 = 0
-- Shard 1: orders where user_id % 4 = 1
-- Shard 2: orders where user_id % 4 = 2
-- Shard 3: orders where user_id % 4 = 3

Vertical Partitioning

Split columns or tables across databases, typically by feature domain. Each database has a different schema.

-- User DB (handles auth & profiles)
CREATE TABLE users (
    user_id  BIGINT PRIMARY KEY,
    email    VARCHAR(255),
    password_hash VARCHAR(255),
    name     VARCHAR(100)
);

-- Order DB (handles e-commerce)
CREATE TABLE orders (
    order_id   BIGINT PRIMARY KEY,
    user_id    BIGINT NOT NULL,  -- FK is now cross-DB, enforced at app level
    product_id BIGINT NOT NULL,
    amount     DECIMAL(10,2)
);

-- Analytics DB (handles reporting)
CREATE TABLE page_views (
    view_id    BIGINT PRIMARY KEY,
    user_id    BIGINT,
    page_url   TEXT,
    viewed_at  TIMESTAMP
);
AspectHorizontal (Sharding)Vertical
What's splitRows of the same tableDifferent tables/columns
Schema per shardIdenticalDifferent
ScalesWrite throughput + storage linearlyIsolates load by domain
ComplexityShard routing, cross-shard queriesCross-DB joins, distributed TX
When to useSingle table too large/hotFeature isolation, microservices

In practice, most systems use vertical partitioning first (split by microservice) and add horizontal sharding later within each service when a single table grows too large.

Sharding Strategies

Hash-Based (Key-Based) Sharding

Apply a hash function to the shard key and use modulo to determine the shard:

shard_id = hash(shard_key) % num_shards

-- Example with 4 shards:
-- user_id=101 → hash(101) % 4 = 1 → Shard 1
-- user_id=205 → hash(205) % 4 = 3 → Shard 3
-- user_id=307 → hash(307) % 4 = 3 → Shard 3
-- user_id=412 → hash(412) % 4 = 0 → Shard 0

In application code (Python example):

import hashlib

NUM_SHARDS = 4
shard_connections = {
    0: get_db_connection("shard-0.db.internal"),
    1: get_db_connection("shard-1.db.internal"),
    2: get_db_connection("shard-2.db.internal"),
    3: get_db_connection("shard-3.db.internal"),
}

def get_shard(user_id: int) -> int:
    """Deterministic shard assignment using hash."""
    h = hashlib.md5(str(user_id).encode()).hexdigest()
    return int(h, 16) % NUM_SHARDS

def insert_order(user_id, product_id, amount):
    shard_id = get_shard(user_id)
    conn = shard_connections[shard_id]
    conn.execute(
        "INSERT INTO orders (user_id, product_id, amount) VALUES (%s, %s, %s)",
        (user_id, product_id, amount)
    )

def get_user_orders(user_id):
    shard_id = get_shard(user_id)
    conn = shard_connections[shard_id]
    return conn.execute(
        "SELECT * FROM orders WHERE user_id = %s ORDER BY created_at DESC",
        (user_id,)
    )

Pros: Even data distribution (with good hash function), simple to implement, O(1) lookup.
Cons: Range queries impossible without scatter-gather. Resharding is painful — changing num_shards moves almost all data (see animation below).

▶ Hash-Based Sharding

Watch how user IDs are distributed across shards, and what happens when you add a 5th shard.

Range-Based Sharding

Assign contiguous ranges of the shard key to different shards:

-- Range-based sharding by user_id
Shard 0: user_id     1 – 1,000,000
Shard 1: user_id 1,000,001 – 2,000,000
Shard 2: user_id 2,000,001 – 3,000,000
Shard 3: user_id 3,000,001 – 4,000,000

-- Or by timestamp (time-series data)
Shard 0: created_at in 2024-Q1
Shard 1: created_at in 2024-Q2
Shard 2: created_at in 2024-Q3
Shard 3: created_at in 2024-Q4

Range routing logic:

def get_shard_range(user_id: int) -> int:
    """Route to shard based on ID range."""
    RANGE_SIZE = 1_000_000
    return (user_id - 1) // RANGE_SIZE

# Alphabetical range for usernames
def get_shard_alpha(username: str) -> int:
    first_char = username[0].upper()
    if first_char <= 'F':   return 0  # A-F → Shard 0
    elif first_char <= 'L': return 1  # G-L → Shard 1
    elif first_char <= 'R': return 2  # M-R → Shard 2
    else:                   return 3  # S-Z → Shard 3

Pros: Range queries stay on one shard (huge win for time-series). Easy to understand. Easy to split a hot shard by adjusting boundaries.
Cons: Uneven distribution (hotspots) if key distribution is skewed. New data clusters on the "latest" shard for monotonically increasing keys.

▶ Range-Based Sharding & Hotspot Problem

See how alphabetical range sharding creates hotspots and how rebalancing fixes them.

Directory-Based Sharding

Maintain a lookup table that maps each key (or key range) to a shard:

-- Lookup table (stored in a fast, replicated metadata DB)
CREATE TABLE shard_directory (
    shard_key_min  BIGINT,
    shard_key_max  BIGINT,
    shard_id       INT,
    PRIMARY KEY (shard_key_min)
);

INSERT INTO shard_directory VALUES
    (1,       500000,  0),
    (500001,  1200000, 1),  -- larger range: this shard is beefy
    (1200001, 1800000, 2),
    (1800001, 2500000, 3);

-- Routing query
SELECT shard_id FROM shard_directory
WHERE %s BETWEEN shard_key_min AND shard_key_max;
class DirectoryRouter:
    def __init__(self):
        self.directory = {}  # key → shard_id
        self._load_from_db()

    def _load_from_db(self):
        """Cache the directory in memory for fast lookups."""
        rows = metadata_db.execute("SELECT shard_key_min, shard_key_max, shard_id FROM shard_directory")
        for row in rows:
            self.ranges.append((row['shard_key_min'], row['shard_key_max'], row['shard_id']))

    def get_shard(self, key: int) -> int:
        for min_key, max_key, shard_id in self.ranges:
            if min_key <= key <= max_key:
                return shard_id
        raise ValueError(f"No shard found for key {key}")

    def move_range(self, min_key, max_key, new_shard_id):
        """Resharding: update the directory to point a range to a new shard."""
        metadata_db.execute(
            "UPDATE shard_directory SET shard_id = %s WHERE shard_key_min = %s",
            (new_shard_id, min_key)
        )
        self._load_from_db()

Pros: Maximum flexibility — move any key to any shard. Resharding is just a directory update. No hash function constraints.
Cons: The directory is a single point of failure and a bottleneck. Must be fast (in-memory cache) and highly available (replicated).

Geographic Sharding

Route data to shards based on geographic region:

-- Geographic shard assignment
Shard EU:  users in Europe        → eu-west-1.db.internal
Shard US:  users in North America → us-east-1.db.internal
Shard APAC: users in Asia-Pacific → ap-southeast-1.db.internal

def get_shard_geo(user_country: str) -> str:
    EU_COUNTRIES = {'DE', 'FR', 'GB', 'ES', 'IT', 'NL', ...}
    APAC_COUNTRIES = {'JP', 'KR', 'AU', 'IN', 'SG', ...}
    if user_country in EU_COUNTRIES:
        return "shard-eu"
    elif user_country in APAC_COUNTRIES:
        return "shard-apac"
    else:
        return "shard-us"  # default

Pros: Low latency (data near users), GDPR/data residency compliance, natural disaster isolation.
Cons: Uneven distribution (US has more users than APAC for many services). Cross-region queries are slow.

Strategy Comparison

StrategyDistributionRange QueriesReshardingBest For
Hash-based★★★ Even★ Poor★ PainfulKey-value lookups
Range-based★★ Depends★★★ Great★★ Split rangesTime-series, sorted data
Directory-based★★★ Flexible★★ Moderate★★★ EasyComplex routing needs
Geographic★ Uneven★★ Regional★★ Add regionsCompliance, low latency

Shard Key Selection

This is the single most important decision in sharding. A bad shard key can make your system worse than a single database. The ideal shard key satisfies three criteria:

  1. High cardinality — many distinct values so data distributes evenly. user_id (millions of values) is good; country (200 values, uneven) is bad.
  2. Even distribution — values should be uniformly distributed across the key space. Monotonically increasing IDs push all writes to the latest shard.
  3. Query locality — most queries should hit a single shard. If you shard orders by user_id, then "get all orders for user X" hits one shard. If you shard by order_id, that query scatters across all shards.

Good vs Bad Shard Keys — Detailed Analysis

Shard KeyCardinalityDistributionQuery LocalityVerdict
user_id★★★ Millions★★★ Even (hashed)★★★ User queries✓ Excellent
order_id (UUID)★★★ Unlimited★★★ Perfect★ Poor for user queries⚠ Depends on access
country★ ~200★ Very skewed★★ Geo queries✗ Bad — US shard overloaded
created_at★★★ High★ Append-only hotspot★★★ Time ranges✗ Bad — all writes to latest shard
status★ 3–5 values★ Very skewed★ Rarely queried alone✗ Terrible

Compound Shard Keys

Sometimes no single field works well. Use compound keys:

-- MongoDB compound shard key example
db.orders.createIndex({ "region": 1, "user_id": 1 })
sh.shardCollection("mydb.orders", { "region": 1, "user_id": 1 })

-- This allows:
-- 1. Geo-local queries: all data for region="US" on nearby shards
-- 2. User queries: within a region, one user's data is co-located
-- 3. Better distribution than region alone

The Celebrity Problem (Hot Keys)

Even with a good shard key like user_id, some users are "hot" — a celebrity with 100M followers generates vastly more read traffic. Solutions:

-- Option 1: Add a random suffix to spread the celebrity's data
shard_key = f"{user_id}_{random.randint(0, 9)}"
-- user 12345 becomes 12345_0 through 12345_9 → 10 shards
-- Reads must fan out to all 10 sub-shards

-- Option 2: Dedicated shard for hot keys
HOT_USERS = {12345: "shard-celebrity-1", 67890: "shard-celebrity-2"}
def get_shard(user_id):
    if user_id in HOT_USERS:
        return HOT_USERS[user_id]
    return hash_based_shard(user_id)

-- Option 3: Cache the hot key's data aggressively
-- Read from cache, write to shard + invalidate cache

Cross-Shard Queries

This is the biggest operational challenge of sharding. Any query that can't be routed to a single shard becomes expensive.

Scatter-Gather Pattern

Query all shards, merge results in the application layer:

def get_top_orders_by_amount(limit=10):
    """Scatter-gather: query all shards, merge results."""
    all_results = []

    # Scatter: send query to all shards in parallel
    futures = []
    for shard_id, conn in shard_connections.items():
        future = executor.submit(
            conn.execute,
            "SELECT * FROM orders ORDER BY amount DESC LIMIT %s",
            (limit,)
        )
        futures.append(future)

    # Gather: collect results
    for future in futures:
        all_results.extend(future.result())

    # Merge: sort and truncate
    all_results.sort(key=lambda x: x['amount'], reverse=True)
    return all_results[:limit]

Performance impact: Scatter-gather latency = max(shard latencies) + merge time. With 64 shards, one slow shard delays the entire query. Tail latency p99 gets much worse.

Cross-Shard Joins (Avoid Them)

JOINs across shards are extremely expensive. Instead, denormalize:

-- BAD: Cross-shard join (user on Shard 2, orders on Shard 7)
SELECT u.name, o.amount
FROM users u JOIN orders o ON u.user_id = o.user_id
WHERE o.order_id = 12345;

-- GOOD: Denormalize — store user_name in the orders table
ALTER TABLE orders ADD COLUMN user_name VARCHAR(100);

-- Now the query only hits one shard:
SELECT user_name, amount FROM orders WHERE order_id = 12345;
Denormalization trade-off: You trade storage (duplicated data) and write complexity (must update user_name in orders when it changes) for read performance. In sharded systems, this trade-off is almost always worth it.

Cross-Shard Transactions

ACID transactions across shards are very hard. Two approaches:

Two-Phase Commit (2PC):

-- Phase 1: PREPARE — each shard votes
Shard 1: PREPARE TRANSACTION 'tx-001';  → OK
Shard 3: PREPARE TRANSACTION 'tx-001';  → OK

-- Phase 2: COMMIT — if all voted OK
Shard 1: COMMIT PREPARED 'tx-001';
Shard 3: COMMIT PREPARED 'tx-001';

-- If any shard fails:
Shard 1: ROLLBACK PREPARED 'tx-001';
Shard 3: ROLLBACK PREPARED 'tx-001';

Problem with 2PC: It's a blocking protocol. If the coordinator crashes between PREPARE and COMMIT, all participants are stuck holding locks. In practice, 2PC is slow and fragile.

Saga Pattern (preferred for microservices):

-- Saga: sequence of local transactions with compensating actions
Step 1: Shard A — debit user's wallet          (compensate: credit back)
Step 2: Shard B — create order record           (compensate: cancel order)
Step 3: Shard C — reserve inventory             (compensate: release inventory)

-- If Step 3 fails:
Compensate Step 2: cancel the order on Shard B
Compensate Step 1: credit the wallet on Shard A

Aggregation Queries

-- "How many orders were placed today?" across all shards
def count_today_orders():
    total = 0
    for shard_id, conn in shard_connections.items():
        result = conn.execute(
            "SELECT COUNT(*) as cnt FROM orders WHERE created_at >= CURRENT_DATE"
        )
        total += result['cnt']
    return total

-- Better approach: maintain a real-time aggregation table
-- on a dedicated analytics shard or use CDC to stream to a data warehouse

Resharding

Resharding is the process of changing the number of shards or reassigning data between shards. It's triggered by:

Strategy 1: Consistent Hashing (Minimal Movement)

Instead of hash(key) % N, use a consistent hash ring. Adding a shard only moves ~1/N of keys instead of nearly all of them. See the next section for details.

Strategy 2: Double-Write Migration

# Phase 1: Start double-writing to old AND new shard
def insert_order(user_id, product_id, amount):
    old_shard = get_shard_v1(user_id)  # old routing
    new_shard = get_shard_v2(user_id)  # new routing

    old_shard.execute("INSERT INTO orders ...")
    new_shard.execute("INSERT INTO orders ...")  # double-write

# Phase 2: Backfill — copy historical data from old to new shards
for shard in old_shards:
    for row in shard.execute("SELECT * FROM orders"):
        new_shard = get_shard_v2(row['user_id'])
        new_shard.upsert(row)  # idempotent upsert

# Phase 3: Verify — compare row counts, checksums
# Phase 4: Cutover — switch reads to new shards
# Phase 5: Stop double-writes, decommission old shards

Strategy 3: Backfill + Cutover (Vitess / gh-ost)

# Vitess resharding workflow for MySQL:
# 1. Create new shards with the desired key ranges
$ vtctl CreateShard -- mydb/-40     # left half
$ vtctl CreateShard -- mydb/40-     # right half

# 2. Start VReplication streams (online, no downtime)
$ vtctl Reshard -- --source_shards '0' --target_shards '-40,40-' Create

# 3. Vitess copies existing data while replicating new writes
$ vtctl Reshard -- --source_shards '0' --target_shards '-40,40-' SwitchTraffic

# 4. When caught up, atomically switch traffic (< 1s downtime)
$ vtctl Reshard -- --source_shards '0' --target_shards '-40,40-' Complete
Vitess (developed at YouTube) is the gold standard for MySQL sharding. It handles shard routing, connection pooling, query rewriting, online resharding, and schema migrations. Slack, GitHub, Square, and HubSpot use it in production.

Consistent Hashing for Sharding

Standard hash-based sharding (hash(key) % N) has a catastrophic flaw: changing N redistributes almost every key. Consistent hashing solves this.

How It Works

  1. Hash ring: Both shard IDs and data keys are hashed onto a circular ring (0 to 2³²−1).
  2. Assignment: Each key is assigned to the next shard clockwise on the ring.
  3. Add shard: Only keys between the new shard and its predecessor move. ~K/N keys move (where K = total keys, N = shards).
  4. Remove shard: Only that shard's keys move to the next shard clockwise.
import hashlib
import bisect

class ConsistentHashRing:
    def __init__(self, shards, virtual_nodes=150):
        self.ring = {}
        self.sorted_keys = []
        self.virtual_nodes = virtual_nodes

        for shard in shards:
            self.add_shard(shard)

    def _hash(self, key: str) -> int:
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

    def add_shard(self, shard: str):
        """Add a shard with virtual nodes for better distribution."""
        for i in range(self.virtual_nodes):
            virtual_key = f"{shard}:vn{i}"
            h = self._hash(virtual_key)
            self.ring[h] = shard
            bisect.insort(self.sorted_keys, h)

    def remove_shard(self, shard: str):
        """Remove a shard — its keys automatically go to the next shard."""
        for i in range(self.virtual_nodes):
            virtual_key = f"{shard}:vn{i}"
            h = self._hash(virtual_key)
            del self.ring[h]
            self.sorted_keys.remove(h)

    def get_shard(self, key: str) -> str:
        """Find which shard owns this key."""
        if not self.ring:
            raise ValueError("No shards in the ring")
        h = self._hash(str(key))
        idx = bisect.bisect_right(self.sorted_keys, h)
        if idx == len(self.sorted_keys):
            idx = 0  # wrap around
        return self.ring[self.sorted_keys[idx]]

# Usage
ring = ConsistentHashRing(["shard-0", "shard-1", "shard-2", "shard-3"])

# Route queries
shard = ring.get_shard("user:12345")  # → "shard-2"

# Add a new shard — only ~25% of keys move!
ring.add_shard("shard-4")
shard = ring.get_shard("user:12345")  # might still be "shard-2"

Virtual Nodes

Without virtual nodes, shards can end up with very uneven portions of the ring. Virtual nodes (vnodes) map each physical shard to many points on the ring:

Real-World Examples

Instagram — PostgreSQL Sharded by user_id

Instagram's early architecture (pre-Facebook acquisition) used PostgreSQL with application-level sharding:

-- Instagram's ID generation (simplified, per-shard PL/pgSQL function)
CREATE OR REPLACE FUNCTION next_id(OUT result BIGINT) AS $$
DECLARE
    our_epoch  BIGINT := 1314220021721;  -- custom epoch
    seq_id     BIGINT;
    now_millis BIGINT;
    shard_id   INT := 5;  -- different per shard
BEGIN
    SELECT nextval('table_id_seq') % 1024 INTO seq_id;
    SELECT FLOOR(EXTRACT(EPOCH FROM clock_timestamp()) * 1000) INTO now_millis;
    result := (now_millis - our_epoch) << 23;   -- 41 bits: timestamp
    result := result | (shard_id << 10);         -- 13 bits: shard ID
    result := result | (seq_id);                 -- 10 bits: sequence
END;
$$ LANGUAGE plpgsql;

Discord — guild_id Sharding on Cassandra → ScyllaDB

-- Discord's message table (Cassandra CQL, simplified)
CREATE TABLE messages (
    guild_id    BIGINT,
    bucket      INT,          -- time bucket to limit partition size
    message_id  BIGINT,       -- Snowflake ID (contains timestamp)
    author_id   BIGINT,
    content     TEXT,
    PRIMARY KEY ((guild_id, bucket), message_id)
) WITH CLUSTERING ORDER BY (message_id DESC);

Slack — Vitess for MySQL

Uber — Schemaless (Custom Sharded Storage)

Sharding in Interviews

When to Propose Sharding

In a system design interview, don't jump to sharding. Mention it when:

How to Discuss Shard Key Selection

  1. Identify the primary access pattern. "Users will query their own orders, so user_id is the natural shard key."
  2. Evaluate cardinality and distribution. "We have 100M users, evenly distributed — good cardinality."
  3. Consider hotspots. "Power users generate 1000× more data. We can handle this with caching or dedicated shards."
  4. Address cross-shard queries. "Admin queries that span all users will use scatter-gather with pre-aggregated tables."

Common Follow-Up Questions

QuestionStrong Answer Pattern
"What if one shard gets hot?"Monitor per-shard metrics. Short term: cache hot keys. Long term: split the shard (range-based) or add vnodes (consistent hashing).
"How do you handle cross-shard joins?"Avoid them. Denormalize frequently joined data. For rare needs, use scatter-gather or a materialized view in an analytics DB.
"How do you add a new shard?"Use consistent hashing (move ~1/N keys). Double-write to old + new shard during migration. Vitess-style online resharding for MySQL.
"What about auto-increment IDs?"Can't use DB auto-increment across shards (collisions). Use Snowflake IDs (Twitter), UUIDs, or Instagram-style ID generation encoding shard ID.
"How do you maintain global uniqueness?"Each shard generates IDs with embedded shard identifier, or use a central ID service (Twitter Snowflake, Leaf by Meituan).

Quick Reference: Sharding Checklist

  1. ☐ Exhaust vertical scaling, caching, read replicas first
  2. ☐ Choose shard key (high cardinality, even distribution, query locality)
  3. ☐ Pick strategy (hash / range / directory / geographic)
  4. ☐ Plan for cross-shard queries (denormalize, scatter-gather)
  5. ☐ Design ID generation (Snowflake, embedded shard ID)
  6. ☐ Plan resharding strategy (consistent hashing, Vitess)
  7. ☐ Monitor per-shard metrics (size, latency, connections)
  8. ☐ Handle hot keys (caching, dedicated shards, key salting)

Summary