Design: Notification System
Introduction
Notifications are the nervous system of modern applications. Every time a friend likes your photo, a package ships, your bank detects a suspicious transaction, or a breaking news story emerges — a notification reaches you through one or more channels. Behind every "ding" on your phone is a distributed system processing hundreds of millions of events per day, routing them through the right channel, respecting user preferences, and handling failures gracefully.
Designing a notification system at scale is far more than calling sendPush(). It involves orchestrating multiple third-party providers (APNs, FCM, Twilio, SendGrid), building reliable message queues, handling retry logic, preventing duplicate deliveries, and respecting user preferences — all while maintaining sub-second latency for time-critical alerts.
In this post, we'll design a notification system capable of handling 100 million daily notifications across push, SMS, and email channels — the kind of system powering companies like Uber, Slack, and Instagram.
Requirements
Functional Requirements
- Multi-channel delivery: Support push notifications (iOS & Android), SMS, and email
- User preferences: Allow users to opt in/out per channel and per notification type
- Template engine: Support reusable notification templates with dynamic variables
- Priority levels: High (security alerts), medium (social), low (marketing)
- Rate limiting: Cap the number of notifications per user per time window
- Retry on failure: Automatically retry failed deliveries with exponential backoff
- Analytics: Track delivery rate, open rate, and click-through rate
Non-Functional Requirements
- Scale: 100 million daily notifications (~1,160 per second average, ~5,000/s peak)
- Latency: High-priority notifications delivered within 1 second; low-priority within 5 minutes
- Reliability: At-least-once delivery with deduplication to prevent duplicates
- Availability: 99.99% uptime — notification systems are critical infrastructure
- Extensibility: Easy to add new channels (e.g., Slack, WhatsApp, in-app) without re-architecting
Back-of-the-Envelope Estimation
Daily notifications: 100,000,000
Average per second: 100M / 86,400 ≈ 1,160 notifications/s
Peak (3× average): ~3,500 notifications/s
Burst (10× for events): ~12,000 notifications/s
Channel breakdown (typical):
Push: 60% → 60M/day → ~700/s avg
Email: 30% → 30M/day → ~350/s avg
SMS: 10% → 10M/day → ~115/s avg
Storage per notification: ~500 bytes metadata
Daily storage: 100M × 500B = 50 GB/day → 18 TB/year
Device tokens: 500M users × 2 devices avg × 64B = ~64 GB
Notification Channels Deep Dive
iOS Push Notifications — APNs
Apple Push Notification service (APNs) is the gateway for all iOS push notifications. Your server communicates with APNs over HTTP/2, sending a JSON payload to a specific device token.
// APNs HTTP/2 Request
POST /3/device/{device_token}
Host: api.push.apple.com
Authorization: bearer {jwt_token}
apns-topic: com.myapp.bundleid
apns-priority: 10 // 10 = immediate, 5 = power-saving
apns-push-type: alert
apns-expiration: 0 // 0 = don't store if device offline
{
"aps": {
"alert": {
"title": "Security Alert 🔒",
"subtitle": "Unusual login detected",
"body": "Someone signed in from Moscow, Russia. Was this you?"
},
"badge": 3,
"sound": "critical_alert.caf",
"category": "SECURITY_ALERT", // Action buttons
"mutable-content": 1, // Enable Notification Service Extension
"content-available": 1 // Enable background fetch
},
"custom_data": {
"notification_id": "ntf_8f3a2b",
"action_url": "/security/review",
"timestamp": 1714089600
}
}
Key APNs concepts:
- Device Token: A unique, opaque identifier assigned by APNs to a specific device+app combination. Tokens can change — always use the latest token from the app registration callback
- Token-based authentication (JWT): Sign a JWT with your APNs key (P8 file) — preferred over certificate-based auth. Tokens are valid for 1 hour
- Payload size limit: 4 KB for regular notifications, 5 KB for VoIP
- Feedback service: APNs returns HTTP 410 for expired tokens — remove them from your database
- Rate limiting: Apple may throttle if you send too many notifications to offline devices
# Python: Sending via APNs (using PyAPNs2 library)
import jwt
import time
import httpx
class APNsClient:
SANDBOX = "https://api.sandbox.push.apple.com"
PRODUCTION = "https://api.push.apple.com"
def __init__(self, key_path, key_id, team_id, bundle_id,
sandbox=False):
self.key_id = key_id
self.team_id = team_id
self.bundle_id = bundle_id
self.base_url = self.SANDBOX if sandbox else self.PRODUCTION
with open(key_path, 'r') as f:
self.private_key = f.read()
self._token = None
self._token_time = 0
def _get_jwt(self):
"""JWT tokens are valid for 1 hour — cache and refresh."""
now = time.time()
if self._token and (now - self._token_time) < 3000:
return self._token
self._token = jwt.encode(
{"iss": self.team_id, "iat": int(now)},
self.private_key,
algorithm="ES256",
headers={"kid": self.key_id}
)
self._token_time = now
return self._token
async def send(self, device_token, payload, priority=10):
"""Send a notification. Returns (success, response)."""
url = f"{self.base_url}/3/device/{device_token}"
headers = {
"authorization": f"bearer {self._get_jwt()}",
"apns-topic": self.bundle_id,
"apns-priority": str(priority),
"apns-push-type": "alert",
}
async with httpx.AsyncClient(http2=True) as client:
resp = await client.post(url, json=payload,
headers=headers)
if resp.status_code == 200:
return True, {"apns_id": resp.headers["apns-id"]}
error = resp.json()
# Handle specific error codes
if resp.status_code == 410: # Unregistered
return False, {"error": "TOKEN_EXPIRED",
"remove_token": True}
if resp.status_code == 429: # Too many requests
return False, {"error": "RATE_LIMITED",
"retry_after": 5}
return False, {"error": error.get("reason"),
"status": resp.status_code}
Android Push Notifications — FCM
Firebase Cloud Messaging (FCM) is Google's cross-platform messaging solution. It supports both notification messages (displayed automatically) and data messages (handled by app code).
// FCM HTTP v1 API Request
POST https://fcm.googleapis.com/v1/projects/{project_id}/messages:send
Authorization: Bearer {oauth2_token}
Content-Type: application/json
{
"message": {
"token": "fMC_registration_token_abc123...",
"notification": {
"title": "New Message from Alice",
"body": "Hey! Are you coming to the party tonight?",
"image": "https://cdn.example.com/alice_avatar.jpg"
},
"data": {
"notification_id": "ntf_9d4c3a",
"conversation_id": "conv_123",
"click_action": "OPEN_CHAT"
},
"android": {
"priority": "HIGH",
"notification": {
"channel_id": "messages",
"click_action": "OPEN_CHAT",
"color": "#4285F4",
"sound": "message_received"
},
"ttl": "86400s"
},
"apns": {
"payload": {
"aps": { "badge": 5 }
}
},
"webpush": {
"notification": {
"icon": "/icon-192.png"
}
}
}
}
FCM features:
- Topic messaging: Subscribe devices to topics (e.g., "sports_news") and send to all subscribers
- Device groups: Send to multiple devices belonging to one user in a single request
- Condition targeting: Boolean logic on topics —
'sports' in topics && !('cricket' in topics) - Payload limit: 4 KB for notification messages, 4 KB for data messages
- Upstream messaging: Devices can send messages back to the server (for acknowledgements)
SMS — Twilio
SMS is the most reliable channel for critical notifications (2FA codes, security alerts) because it doesn't depend on an internet connection or app installation.
# Sending SMS via Twilio
from twilio.rest import Client
client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
message = client.messages.create(
body="Your verification code is 847291. "
"It expires in 10 minutes.",
from_="+15551234567", # Your Twilio number
to="+15559876543", # Recipient
status_callback="https://api.example.com/webhooks/sms/status"
)
# Status callback receives delivery status:
# queued → sending → sent → delivered (or failed/undelivered)
print(f"SID: {message.sid}, Status: {message.status}")
# Cost considerations:
# US SMS: ~$0.0079/segment (160 chars)
# International: $0.01–$0.10/segment
# At 10M SMS/day = $79,000 – $1M/day! → Use SMS sparingly
Email — SendGrid / Amazon SES
Email handles high-volume, rich-content notifications like order confirmations, newsletters, and weekly digests.
# Sending via SendGrid
import sendgrid
from sendgrid.helpers.mail import Mail, Email, To, Content
sg = sendgrid.SendGridAPIClient(api_key=SENDGRID_API_KEY)
message = Mail(
from_email=Email("noreply@example.com", "MyApp"),
to_emails=To("user@gmail.com", "John Doe"),
subject="Your order #12345 has shipped! 📦",
)
# Use a dynamic template (recommended)
message.template_id = "d-abc123def456"
message.dynamic_template_data = {
"user_name": "John",
"order_id": "12345",
"tracking_url": "https://track.example.com/12345",
"items": [
{"name": "Wireless Headphones", "qty": 1, "price": "$79.99"},
{"name": "USB-C Cable", "qty": 2, "price": "$12.99"},
],
"total": "$105.97"
}
# Custom headers for tracking
message.header = {
"X-Notification-ID": "ntf_5e2c1a",
"List-Unsubscribe": "<https://example.com/unsub?t=abc>"
}
response = sg.send(message)
# SendGrid pricing: 100K emails/month free, then ~$0.001/email
# Amazon SES: $0.10 per 1,000 emails
High-Level Architecture
The architecture follows an event-driven, queue-based pattern that decouples notification producers from delivery channels. This allows each channel to scale independently, retry failures without blocking other notifications, and absorb traffic spikes via queues.
┌─────────────────────────────────────────────────────────────────┐
│ Notification Producers │
│ (User Service, Order Service, Payment Service, Auth Service) │
└─────────────────────────┬───────────────────────────────────────┘
│ gRPC / REST
▼
┌─────────────────────────────────────────────────────────────────┐
│ Notification Service │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌───────────────┐ │
│ │ Validate │→│ Dedup │→│ Check │→│ Template │ │
│ │ Request │ │ Check │ │ Prefs │ │ Engine │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────┬────────┘ │
│ │ │
│ ┌──────────────────────────────────────────────────▼────────┐ │
│ │ Priority Router / Rate Limiter │ │
│ └──┬──────────────┬──────────────┬──────────────┬──────────┘ │
└─────┼──────────────┼──────────────┼──────────────┼──────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────────┐
│Push Queue│ │SMS Queue│ │Email Q │ │In-App Queue │
│(Kafka) │ │(Kafka) │ │(Kafka) │ │(Kafka) │
└────┬─────┘ └────┬────┘ └────┬────┘ └──────┬──────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────────┐
│Push │ │SMS │ │Email │ │In-App │
│Workers │ │Workers │ │Workers │ │Workers │
│(N pods) │ │(N pods) │ │(N pods) │ │(N pods) │
└────┬─────┘ └────┬────┘ └────┬────┘ └──────┬──────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────────┐
│APNs/FCM │ │Twilio │ │SendGrid │ │WebSocket │
│ │ │ │ │/ SES │ │Server │
└──────────┘ └─────────┘ └─────────┘ └─────────────┘
Component Responsibilities
| Component | Responsibility |
|---|---|
| Notification Service | API gateway — validates, deduplicates, checks preferences, renders templates, routes to queues |
| User Preference Service | Stores per-user, per-channel opt-in/out settings; quiet hours; language preferences |
| Template Engine | Renders notification content from templates + variables, handles i18n |
| Priority Router | Routes to per-priority queues; applies rate limiting |
| Channel Queues (Kafka) | One topic per channel — buffers notifications, enables async processing |
| Channel Workers | Consume from queues, call third-party APIs, handle retries |
| Analytics Service | Tracks delivery, open, click events via webhooks and pixel tracking |
▶ Multi-Channel Delivery Flow
Watch an event trigger a notification that gets routed through preference checks, template rendering, and channel-specific queues to third-party providers.
Notification Service API
// POST /api/v1/notifications
// Send a notification to one or more recipients
{
"notification_id": "ntf_unique_idempotency_key",
"type": "ORDER_SHIPPED",
"priority": "medium", // high | medium | low
"recipients": [
{
"user_id": "user_abc123",
"channels": ["push", "email"], // optional override
"variables": { // per-recipient template vars
"order_id": "ORD-12345",
"tracking_url": "https://track.example.com/12345"
}
}
],
"template_id": "tmpl_order_shipped",
"fallback_channels": ["sms"], // if primary channels fail
"schedule_at": null, // null = send immediately
"expires_at": "2026-04-15T12:00:00Z"
}
// Response
{
"request_id": "req_7f2a9c",
"status": "accepted",
"notifications_queued": 2, // push + email
"estimated_delivery": "< 1s"
}
// Batch API for bulk notifications (e.g., marketing campaigns)
// POST /api/v1/notifications/batch
{
"batch_id": "batch_spring_sale_2026",
"template_id": "tmpl_spring_sale",
"priority": "low",
"segment": {
"filter": "last_active > '2026-01-01' AND country IN ('US','CA')",
"estimated_count": 2500000
},
"throttle": {
"max_per_second": 1000, // don't overwhelm providers
"window_hours": 4 // spread over 4 hours
}
}
User Preference Service
User preferences are critical — sending notifications to users who opted out is both a bad experience and potentially a legal violation (GDPR, CAN-SPAM, TCPA). The preference service is a read-heavy microservice backed by a fast key-value store.
Data Model
// User notification preferences table
CREATE TABLE user_notification_prefs (
user_id VARCHAR(64) NOT NULL,
notification_type VARCHAR(64) NOT NULL, -- e.g., ORDER_SHIPPED
channel VARCHAR(16) NOT NULL, -- push | sms | email
enabled BOOLEAN DEFAULT TRUE,
updated_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (user_id, notification_type, channel)
);
// Global user settings
CREATE TABLE user_notification_settings (
user_id VARCHAR(64) PRIMARY KEY,
quiet_hours_start TIME, -- e.g., 22:00
quiet_hours_end TIME, -- e.g., 07:00
timezone VARCHAR(64), -- e.g., America/New_York
language VARCHAR(8), -- e.g., en, es, ja
marketing_opt_in BOOLEAN DEFAULT TRUE,
digest_frequency VARCHAR(16), -- realtime | hourly | daily
updated_at TIMESTAMP DEFAULT NOW()
);
// Device tokens table
CREATE TABLE user_devices (
user_id VARCHAR(64) NOT NULL,
device_id VARCHAR(128) NOT NULL,
platform VARCHAR(16) NOT NULL, -- ios | android | web
device_token VARCHAR(512) NOT NULL, -- APNs/FCM token
app_version VARCHAR(16),
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW(),
last_seen_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (user_id, device_id)
);
CREATE INDEX idx_device_token ON user_devices(device_token);
CREATE INDEX idx_platform ON user_devices(user_id, platform);
Preference Check Flow
class PreferenceService:
def __init__(self, cache, db):
self.cache = cache # Redis
self.db = db # PostgreSQL
async def get_enabled_channels(self, user_id, notification_type):
"""Return list of channels the user wants for this type."""
# 1. Check cache first (TTL: 5 minutes)
cache_key = f"prefs:{user_id}:{notification_type}"
cached = await self.cache.get(cache_key)
if cached:
return json.loads(cached)
# 2. Query database
prefs = await self.db.fetch_all("""
SELECT channel, enabled
FROM user_notification_prefs
WHERE user_id = $1 AND notification_type = $2
""", user_id, notification_type)
# 3. Build channel list (default: all enabled)
channels = {p["channel"]: p["enabled"] for p in prefs}
enabled = [ch for ch in ["push", "email", "sms"]
if channels.get(ch, True)] # default ON
# 4. Check quiet hours
settings = await self._get_settings(user_id)
if self._in_quiet_hours(settings):
# During quiet hours, only high-priority via push
enabled = []
# 5. Cache and return
await self.cache.setex(cache_key, 300,
json.dumps(enabled))
return enabled
def _in_quiet_hours(self, settings):
if not settings.get("quiet_hours_start"):
return False
user_tz = pytz.timezone(settings.get("timezone", "UTC"))
now = datetime.now(user_tz).time()
start = settings["quiet_hours_start"]
end = settings["quiet_hours_end"]
if start < end:
return start <= now <= end
else: # overnight (e.g., 22:00 - 07:00)
return now >= start or now <= end
Template Engine
A template engine separates notification content from delivery logic. Product teams can update notification copy without deploying code changes. Templates support variables, conditionals, and i18n.
Template Data Model
CREATE TABLE notification_templates (
template_id VARCHAR(64) PRIMARY KEY,
notification_type VARCHAR(64) NOT NULL,
channel VARCHAR(16) NOT NULL, -- push | sms | email
language VARCHAR(8) DEFAULT 'en',
version INT DEFAULT 1,
is_active BOOLEAN DEFAULT TRUE,
-- Channel-specific fields
subject TEXT, -- email only
title TEXT, -- push only
body TEXT NOT NULL, -- all channels
html_body TEXT, -- email only (rich HTML)
image_url TEXT, -- push only
action_url TEXT, -- deep link / CTA
-- Metadata
variables JSONB, -- expected variables schema
created_by VARCHAR(64),
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
UNIQUE (notification_type, channel, language, version)
);
Template Examples
// Push template: ORDER_SHIPPED
{
"template_id": "tmpl_order_shipped_push_en_v3",
"notification_type": "ORDER_SHIPPED",
"channel": "push",
"language": "en",
"title": "Your order is on the way! 📦",
"body": "Order #{{order_id}} shipped via {{carrier}}. "
"Estimated delivery: {{eta}}.",
"image_url": "{{product_image_url}}",
"action_url": "myapp://orders/{{order_id}}/tracking",
"variables": {
"order_id": {"type": "string", "required": true},
"carrier": {"type": "string", "required": true},
"eta": {"type": "string", "required": true},
"product_image_url": {"type": "string", "required": false}
}
}
// Email template: ORDER_SHIPPED (HTML)
{
"template_id": "tmpl_order_shipped_email_en_v3",
"notification_type": "ORDER_SHIPPED",
"channel": "email",
"subject": "Your order #{{order_id}} has shipped!",
"body": "Hi {{user_name}}, your order has shipped...",
"html_body": "<!DOCTYPE html>...rich email template..."
}
// SMS template: VERIFICATION_CODE
{
"template_id": "tmpl_2fa_sms_en_v1",
"notification_type": "VERIFICATION_CODE",
"channel": "sms",
"body": "{{app_name}}: Your code is {{code}}. "
"Expires in {{expiry_min}} min. "
"Don't share this code.",
"variables": {
"app_name": {"type": "string", "default": "MyApp"},
"code": {"type": "string", "required": true},
"expiry_min": {"type": "number", "default": 10}
}
}
class TemplateEngine:
"""Renders notification templates with variable substitution."""
def __init__(self, cache, db):
self.cache = cache
self.db = db
# Precompile regex for {{variable}} patterns
self._var_pattern = re.compile(r'\{\{(\w+)\}\}')
async def render(self, template_id, variables, language='en'):
"""Render a template with the given variables."""
# Fetch template (cached)
template = await self._get_template(template_id, language)
if not template:
raise TemplateNotFoundError(template_id)
# Validate required variables
schema = template.get("variables", {})
for var_name, var_config in schema.items():
if var_config.get("required") and var_name not in variables:
raise MissingVariableError(var_name, template_id)
# Apply defaults
if var_name not in variables and "default" in var_config:
variables[var_name] = var_config["default"]
# Render each text field
rendered = {}
for field in ["title", "subject", "body", "html_body",
"image_url", "action_url"]:
if template.get(field):
rendered[field] = self._substitute(
template[field], variables)
return rendered
def _substitute(self, text, variables):
"""Replace {{var}} placeholders with values."""
def replacer(match):
key = match.group(1)
value = variables.get(key, "")
return self._sanitize(str(value))
return self._var_pattern.sub(replacer, text)
def _sanitize(self, value):
"""Prevent injection in notification content."""
return (value
.replace("&", "&")
.replace("<", "<")
.replace(">", ">"))
Priority Queues
Not all notifications are created equal. A 2FA code must arrive in seconds; a marketing email can wait hours. We use separate Kafka topics per priority level to ensure high-priority messages are never blocked behind a backlog of low-priority ones.
Priority Levels
| Priority | SLA | Examples | Worker Allocation |
|---|---|---|---|
| 🔴 High (P0) | < 1 second | 2FA codes, security alerts, payment failures | 40% of workers |
| 🟡 Medium (P1) | < 30 seconds | Social notifications, chat messages, order updates | 40% of workers |
| 🟢 Low (P2) | < 5 minutes | Marketing emails, weekly digests, recommendations | 20% of workers |
# Kafka topic naming convention
# notifications.{channel}.{priority}
TOPICS = {
("push", "high"): "notifications.push.high",
("push", "medium"): "notifications.push.medium",
("push", "low"): "notifications.push.low",
("sms", "high"): "notifications.sms.high",
("sms", "medium"): "notifications.sms.medium",
("email", "high"): "notifications.email.high",
("email", "medium"):"notifications.email.medium",
("email", "low"): "notifications.email.low",
}
# Worker allocation: high-priority consumers get more partitions
# notifications.push.high → 12 partitions, 12 consumers
# notifications.push.medium → 8 partitions, 8 consumers
# notifications.push.low → 4 partitions, 4 consumers
class PriorityRouter:
"""Routes notifications to the correct Kafka topic."""
PRIORITY_MAP = {
# notification_type → default priority
"VERIFICATION_CODE": "high",
"SECURITY_ALERT": "high",
"PAYMENT_FAILED": "high",
"PASSWORD_RESET": "high",
"NEW_MESSAGE": "medium",
"NEW_FOLLOWER": "medium",
"ORDER_SHIPPED": "medium",
"COMMENT_REPLY": "medium",
"MARKETING_CAMPAIGN": "low",
"WEEKLY_DIGEST": "low",
"RECOMMENDATION": "low",
}
def __init__(self, kafka_producer):
self.producer = kafka_producer
async def route(self, notification):
"""Route a notification to the correct queue."""
priority = notification.get("priority") or \
self.PRIORITY_MAP.get(notification["type"], "medium")
for channel in notification["channels"]:
topic = f"notifications.{channel}.{priority}"
message = {
"notification_id": notification["notification_id"],
"user_id": notification["user_id"],
"channel": channel,
"priority": priority,
"rendered_content": notification["rendered"],
"device_tokens": notification.get("device_tokens"),
"recipient_address": notification.get("address"),
"attempt": 0,
"max_retries": 3 if priority == "high" else 5,
"created_at": time.time(),
"expires_at": notification.get("expires_at"),
}
await self.producer.send(
topic,
key=notification["user_id"].encode(),
value=json.dumps(message).encode(),
)
Rate Limiting
Nobody wants to receive 50 notifications in an hour. Rate limiting protects users from notification fatigue and protects your system from runaway services flooding the pipeline.
Multi-Level Rate Limits
# Rate limit configuration
RATE_LIMITS = {
# Per-user limits
"user": {
"push": {"max": 30, "window": "1h"},
"sms": {"max": 5, "window": "1h"},
"email": {"max": 10, "window": "1d"},
},
# Per-user per-type limits (prevent spam from single source)
"user_type": {
"NEW_FOLLOWER": {"max": 10, "window": "1h"},
"COMMENT_REPLY": {"max": 20, "window": "1h"},
"MARKETING_CAMPAIGN": {"max": 1, "window": "1d"},
},
# Global limits (protect third-party API quotas)
"global": {
"sms": {"max": 100000, "window": "1h"},
"email": {"max": 500000, "window": "1h"},
},
# Exempt from rate limiting
"exempt_types": [
"VERIFICATION_CODE", "SECURITY_ALERT",
"PASSWORD_RESET", "PAYMENT_FAILED"
]
}
class NotificationRateLimiter:
"""Sliding-window rate limiter using Redis sorted sets."""
def __init__(self, redis):
self.redis = redis
async def check_and_increment(self, user_id, channel,
notification_type):
"""Return True if allowed, False if rate-limited."""
# Security-critical types are never rate-limited
if notification_type in RATE_LIMITS["exempt_types"]:
return True
now = time.time()
pipe = self.redis.pipeline()
# Check 1: Per-user per-channel limit
key1 = f"rl:user:{user_id}:{channel}"
limit1 = RATE_LIMITS["user"][channel]
window1 = self._parse_window(limit1["window"])
# Check 2: Per-user per-type limit
key2 = f"rl:user_type:{user_id}:{notification_type}"
limit2 = RATE_LIMITS["user_type"].get(notification_type)
# Sliding window: remove expired entries, count remaining
pipe.zremrangebyscore(key1, 0, now - window1)
pipe.zcard(key1)
if limit2:
window2 = self._parse_window(limit2["window"])
pipe.zremrangebyscore(key2, 0, now - window2)
pipe.zcard(key2)
results = await pipe.execute()
# Evaluate limits
count1 = results[1]
if count1 >= limit1["max"]:
return False
if limit2:
count2 = results[3]
if count2 >= limit2["max"]:
return False
# All checks passed — record this notification
pipe2 = self.redis.pipeline()
pipe2.zadd(key1, {f"{now}": now})
pipe2.expire(key1, window1 + 60)
if limit2:
pipe2.zadd(key2, {f"{now}": now})
pipe2.expire(key2, self._parse_window(limit2["window"]) + 60)
await pipe2.execute()
return True
def _parse_window(self, w):
if w.endswith("h"): return int(w[:-1]) * 3600
if w.endswith("d"): return int(w[:-1]) * 86400
return int(w)
Retry with Exponential Backoff
Third-party providers fail. APNs has outages. Twilio returns 500s. SendGrid throttles you. A robust notification system must retry failed deliveries without overwhelming the provider or delaying other notifications.
Retry Strategy
RETRY_CONFIG = {
"max_retries": 5,
"backoff_base": 1, # seconds
"backoff_multiplier": 5, # exponential factor
"backoff_max": 300, # cap at 5 minutes
"jitter": True, # add randomness to prevent thundering herd
# Retry delays: 1s → 5s → 25s → 125s → 300s (capped)
# With jitter: 0.8s → 4.2s → 22s → 98s → 267s (random)
}
def calculate_backoff(attempt, config=RETRY_CONFIG):
"""Calculate delay for retry attempt (0-indexed)."""
delay = config["backoff_base"] * \
(config["backoff_multiplier"] ** attempt)
delay = min(delay, config["backoff_max"])
if config["jitter"]:
# Full jitter: random between 0 and calculated delay
delay = random.uniform(0, delay)
return delay
# Attempt 0 (first retry): 0–1s
# Attempt 1: 0–5s
# Attempt 2: 0–25s
# Attempt 3: 0–125s
# Attempt 4: 0–300s
class ChannelWorker:
"""Base worker that consumes from a Kafka topic and delivers."""
def __init__(self, channel, kafka_consumer, dlq_producer,
analytics):
self.channel = channel
self.consumer = kafka_consumer
self.dlq_producer = dlq_producer
self.analytics = analytics
async def process(self):
"""Main consumer loop."""
async for msg in self.consumer:
notification = json.loads(msg.value)
# Check if notification has expired
if self._is_expired(notification):
await self.analytics.record("expired",
notification)
continue
try:
result = await self._deliver(notification)
if result["success"]:
await self.analytics.record("delivered",
notification)
await self._handle_success(notification,
result)
else:
await self._handle_failure(notification,
result)
except Exception as e:
await self._handle_failure(
notification, {"error": str(e)})
async def _handle_failure(self, notification, result):
"""Handle delivery failure — retry or send to DLQ."""
attempt = notification["attempt"]
max_retries = notification["max_retries"]
if attempt < max_retries:
# Schedule retry with exponential backoff
delay = calculate_backoff(attempt)
notification["attempt"] = attempt + 1
notification["last_error"] = result.get("error")
notification["next_retry_at"] = time.time() + delay
# Publish back to the same topic with delay
# (Use Kafka headers or a delayed retry topic)
retry_topic = f"notifications.{self.channel}.retry"
await self._schedule_retry(retry_topic,
notification, delay)
await self.analytics.record("retry_scheduled", {
**notification,
"attempt": attempt + 1,
"delay_seconds": delay,
})
else:
# Max retries exceeded → Dead Letter Queue
await self.dlq_producer.send(
"notifications.dlq",
key=notification["user_id"].encode(),
value=json.dumps({
**notification,
"final_error": result.get("error"),
"failed_at": time.time(),
"total_attempts": attempt + 1,
}).encode()
)
await self.analytics.record("dlq", notification)
async def _handle_success(self, notification, result):
"""Handle token updates or other post-delivery actions."""
if result.get("remove_token"):
# APNs returned 410 — remove expired device token
await self._remove_device_token(
notification["user_id"],
notification.get("device_token"))
def _is_expired(self, notification):
expires = notification.get("expires_at")
return expires and time.time() > expires
Dead Letter Queue (DLQ)
Notifications that fail after all retry attempts are sent to a Dead Letter Queue. A separate DLQ processor handles these for manual review, alerting, and analysis.
class DLQProcessor:
"""Processes permanently failed notifications."""
async def process(self, notification):
# 1. Store in database for audit trail
await self.db.insert("failed_notifications", {
"notification_id": notification["notification_id"],
"user_id": notification["user_id"],
"channel": notification["channel"],
"error": notification["final_error"],
"attempts": notification["total_attempts"],
"created_at": notification["created_at"],
"failed_at": notification["failed_at"],
})
# 2. Try fallback channel if configured
if notification.get("fallback_channels"):
fallback = notification["fallback_channels"].pop(0)
await self.notification_service.resend(
notification, channel=fallback)
# 3. Alert if failure rate exceeds threshold
error_rate = await self._get_error_rate(
notification["channel"], window_minutes=5)
if error_rate > 0.05: # > 5% failure rate
await self.alerting.fire(
severity="critical",
message=f"{notification['channel']} failure "
f"rate {error_rate:.1%} exceeds 5%"
)
▶ Retry Flow with Exponential Backoff
Watch a failed notification retry with increasing delays, and eventually land in the Dead Letter Queue when all retries are exhausted.
Deduplication
In a distributed system with retries and at-least-once delivery semantics, duplicate notifications are inevitable. A user receiving the same "Your order has shipped!" notification three times is a terrible experience. Deduplication ensures exactly-once delivery from the user's perspective.
Dedup Strategy
class NotificationDeduplicator:
"""Prevents duplicate notification delivery using Redis."""
DEDUP_TTL = 86400 # 24 hours
def __init__(self, redis):
self.redis = redis
async def is_duplicate(self, notification_id, channel):
"""Check if this notification was already sent."""
key = f"dedup:{notification_id}:{channel}"
# SETNX returns False if key already exists
is_new = await self.redis.set(
key, "1", nx=True, ex=self.DEDUP_TTL)
return not is_new # True if duplicate
async def mark_sent(self, notification_id, channel):
"""Explicitly mark a notification as sent."""
key = f"dedup:{notification_id}:{channel}"
await self.redis.set(key, "1", ex=self.DEDUP_TTL)
# Usage in the notification pipeline:
async def process_notification(notification):
dedup = NotificationDeduplicator(redis)
# Check for duplicate
if await dedup.is_duplicate(
notification["notification_id"],
notification["channel"]):
logger.info(f"Duplicate detected: "
f"{notification['notification_id']}")
return # Skip — already sent
# Proceed with delivery
result = await deliver(notification)
if result["success"]:
# Delivery confirmed — dedup key already set
pass
else:
# Delivery failed — remove dedup key so retry works
await redis.delete(
f"dedup:{notification['notification_id']}:"
f"{notification['channel']}"
)
notification_id is provided by the caller as an idempotency key. If the same service accidentally sends the same event twice (e.g., due to its own retry logic), the notification system catches it at the dedup layer. This is crucial because producers in microservice architectures also use at-least-once delivery.
Analytics & Tracking
A notification you can't measure is a notification you can't improve. Comprehensive analytics track the full lifecycle of every notification from creation to user interaction.
Event Tracking Pipeline
// Notification lifecycle events
enum NotificationEvent {
CREATED, // Notification request received
VALIDATED, // Passed validation & dedup
PREFERENCE_CHECKED, // User prefs evaluated
RENDERED, // Template rendered
QUEUED, // Published to Kafka
SENT, // Delivered to third-party API
DELIVERED, // Confirmed delivery (APNs/FCM ack)
OPENED, // User opened notification (email pixel)
CLICKED, // User clicked CTA / deep link
DISMISSED, // User dismissed without action
UNSUBSCRIBED, // User opted out from this notification
FAILED, // Delivery failed
RETRY, // Scheduled for retry
DLQ, // Sent to dead letter queue
EXPIRED, // TTL expired before delivery
}
// Events are published to a Kafka analytics topic
// and consumed by a ClickHouse/Druid pipeline for
// real-time dashboards
# Analytics queries (ClickHouse)
-- Delivery rate by channel (last 24h)
SELECT
channel,
countIf(event = 'SENT') AS sent,
countIf(event = 'DELIVERED') AS delivered,
countIf(event = 'FAILED') AS failed,
round(delivered / sent * 100, 2) AS delivery_rate_pct
FROM notification_events
WHERE created_at > now() - INTERVAL 24 HOUR
GROUP BY channel;
-- Result:
-- channel | sent | delivered | failed | delivery_rate_pct
-- push | 58234109 | 57067427 | 1166682| 98.00
-- email | 29102344 | 28229273 | 873071 | 97.00
-- sms | 9872340 | 9773617 | 98723 | 99.00
-- Open rates by notification type (email)
SELECT
notification_type,
countIf(event = 'DELIVERED') AS delivered,
countIf(event = 'OPENED') AS opened,
countIf(event = 'CLICKED') AS clicked,
round(opened / delivered * 100, 2) AS open_rate,
round(clicked / opened * 100, 2) AS click_rate
FROM notification_events
WHERE channel = 'email'
AND created_at > now() - INTERVAL 7 DAY
GROUP BY notification_type
ORDER BY open_rate DESC;
-- P99 latency: time from CREATED to DELIVERED
SELECT
channel,
priority,
quantile(0.50)(delivery_latency_ms) AS p50_ms,
quantile(0.95)(delivery_latency_ms) AS p95_ms,
quantile(0.99)(delivery_latency_ms) AS p99_ms
FROM (
SELECT
channel, priority,
dateDiff('millisecond', created_at, delivered_at)
AS delivery_latency_ms
FROM notification_deliveries
WHERE delivered_at > now() - INTERVAL 1 HOUR
)
GROUP BY channel, priority;
Email Open & Click Tracking
# Email tracking implementation
# 1. Open tracking — invisible 1×1 pixel
def generate_tracking_pixel(notification_id):
"""Insert tracking pixel into email HTML body."""
pixel_url = (
f"https://track.example.com/open"
f"?nid={notification_id}"
f"&t={int(time.time())}"
)
return f'<img src="{pixel_url}" width="1" height="1" '
f'alt="" style="display:none" />'
# 2. Click tracking — rewrite URLs through tracking proxy
def rewrite_links(html_body, notification_id):
"""Replace links with tracked redirect URLs."""
def replace_url(match):
original_url = match.group(1)
tracked_url = (
f"https://track.example.com/click"
f"?nid={notification_id}"
f"&url={urllib.parse.quote(original_url)}"
)
return f'href="{tracked_url}"'
return re.sub(r'href="(https?://[^"]+)"',
replace_url, html_body)
# 3. Push notification engagement — deep link tracking
# Tracked on the client side via SDK:
# - App open from notification → OPENED event
# - CTA button tap → CLICKED event with action_id
# - Swipe to dismiss → DISMISSED event
End-to-End Flow
Let's trace a complete notification through the system. A user's order has just shipped:
Step 1: Order Service publishes event
→ OrderService.publish("ORDER_SHIPPED", {
user_id: "u_alice",
order_id: "ORD-12345",
carrier: "FedEx",
eta: "April 17"
})
Step 2: Notification Service receives the request
→ Validates payload schema ✓
→ Generates notification_id: "ntf_8f3a2b" (UUID)
→ Dedup check: is "ntf_8f3a2b" in Redis? → No → proceed
Step 3: Check user preferences
→ PreferenceService.get_enabled_channels("u_alice",
"ORDER_SHIPPED")
→ Alice has: push ✓, email ✓, sms ✗ (opted out)
→ Not in quiet hours ✓
Step 4: Render templates
→ TemplateEngine.render("tmpl_order_shipped_push_en",
{order_id: "ORD-12345", carrier: "FedEx", ...})
→ Push: title="Your order is on the way! 📦",
body="Order #ORD-12345 shipped via FedEx..."
→ TemplateEngine.render("tmpl_order_shipped_email_en", ...)
→ Email: subject="Your order #ORD-12345 has shipped!"
Step 5: Rate limiting
→ RateLimiter.check("u_alice", "push") → 8/30 → allowed ✓
→ RateLimiter.check("u_alice", "email") → 2/10 → allowed ✓
Step 6: Route to priority queues
→ ORDER_SHIPPED → priority: "medium"
→ Kafka: notifications.push.medium ← push notification
→ Kafka: notifications.email.medium ← email notification
Step 7: Channel workers consume and deliver
→ Push Worker reads from notifications.push.medium
→ Fetches device tokens: [ios_token_abc, android_token_xyz]
→ APNs: POST /3/device/ios_token_abc → 200 OK ✓
→ FCM: POST messages:send → 200 OK ✓
→ Email Worker reads from notifications.email.medium
→ SendGrid: POST /v3/mail/send → 202 Accepted ✓
Step 8: Analytics events recorded
→ CREATED → VALIDATED → PREFERENCE_CHECKED → RENDERED
→ QUEUED → SENT → DELIVERED (push, 340ms)
→ QUEUED → SENT → DELIVERED (email, 1.2s)
Scaling Considerations
Kafka Partitioning Strategy
# Partition by user_id for ordering guarantees
# Notifications for the same user land on the same partition
# → Prevents out-of-order delivery for same user
# But: hot users (influencers) can cause partition hotspots
# Solution: Compound key = user_id + notification_type
partition_key = f"{user_id}:{notification_type}"
# Topic configuration
notifications.push.high:
partitions: 24 # More partitions = more parallelism
replication_factor: 3 # Survive 2 broker failures
retention: 72h # Keep for debugging
min.insync.replicas: 2
notifications.email.low:
partitions: 12
replication_factor: 3
retention: 48h
Worker Auto-Scaling
# Auto-scale workers based on consumer lag
# When lag exceeds threshold, scale up; when cleared, scale down
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: push-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: push-worker-high
minReplicas: 4
maxReplicas: 50
metrics:
- type: External
external:
metric:
name: kafka_consumer_lag
selector:
matchLabels:
topic: notifications.push.high
target:
type: AverageValue
averageValue: "1000" # Scale up if lag > 1000/pod
Third-Party Provider Failover
class PushWorker(ChannelWorker):
"""Push worker with multi-provider failover."""
def __init__(self, apns_client, fcm_client, **kwargs):
super().__init__("push", **kwargs)
self.apns = apns_client
self.fcm = fcm_client
async def _deliver(self, notification):
platform = notification.get("platform")
token = notification.get("device_token")
content = notification["rendered_content"]
if platform == "ios":
# Primary: APNs
success, result = await self.apns.send(
token, self._build_apns_payload(content))
if not success and result.get("error") == "SERVICE_DOWN":
# Failover: queue for later retry
return {"success": False, "error": "APNS_DOWN",
"retryable": True}
return {"success": success, **result}
elif platform == "android":
# Primary: FCM
success, result = await self.fcm.send(
token, self._build_fcm_payload(content))
return {"success": success, **result}
elif platform == "web":
# Web Push Protocol
return await self._send_web_push(
notification, content)
Monitoring & Alerting
# Key metrics to monitor (Prometheus/Grafana)
# 1. Delivery success rate by channel
notification_delivery_total{channel="push", status="success"}
notification_delivery_total{channel="push", status="failed"}
# Alert: delivery rate drops below 95%
- alert: LowDeliveryRate
expr: |
rate(notification_delivery_total{status="success"}[5m]) /
rate(notification_delivery_total[5m]) < 0.95
for: 2m
labels:
severity: critical
# 2. End-to-end latency (creation to delivery)
notification_latency_seconds{channel, priority, quantile}
# Alert: P99 latency for high-priority exceeds 2s
- alert: HighPriorityLatency
expr: |
histogram_quantile(0.99,
rate(notification_latency_seconds_bucket{
priority="high"}[5m])) > 2
for: 1m
labels:
severity: warning
# 3. Consumer lag (queue depth)
kafka_consumer_lag{topic, consumer_group}
# Alert: lag growing consistently (workers falling behind)
- alert: ConsumerLagGrowing
expr: |
delta(kafka_consumer_lag[10m]) > 10000
for: 5m
labels:
severity: warning
# 4. Rate limiting events (are we dropping too many?)
notification_rate_limited_total{user_type, channel}
# 5. DLQ depth (permanently failed notifications)
notification_dlq_depth
# Alert: DLQ growing — indicates systemic issue
- alert: DLQDepthHigh
expr: notification_dlq_depth > 10000
for: 5m
labels:
severity: critical
Summary
| Design Decision | Choice | Rationale |
|---|---|---|
| Message Queue | Kafka (per channel × priority) | High throughput, ordering, replay capability |
| Preference Cache | Redis with 5-min TTL | Read-heavy; preferences rarely change |
| Rate Limiting | Redis sorted sets (sliding window) | Multi-level limits; security types exempt |
| Retry Strategy | Exponential backoff + full jitter | Prevents thundering herd on provider recovery |
| Deduplication | Redis SETNX with 24h TTL | At-least-once semantics need exactly-once UX |
| Analytics Store | ClickHouse (via Kafka) | Columnar OLAP for real-time aggregations |
| Worker Scaling | K8s HPA on consumer lag | Auto-scale workers to match notification volume |