System Design with Python
📖 Concept
System design interviews assess your ability to architect scalable, reliable, and maintainable distributed systems. Python plays a significant role in backend services, data pipelines, and microservice architectures — understanding how to design systems with Python tooling is essential for senior-level interviews.
Key system design components and Python ecosystem:
| Component | Python Tools | Purpose |
|---|---|---|
| Web Framework | FastAPI, Django, Flask | HTTP API layer |
| Task Queue | Celery, Dramatiq, Huey | Async background processing |
| Message Broker | RabbitMQ (pika), Redis (redis-py), Kafka (confluent-kafka) |
Decoupling services |
| Caching | Redis (redis-py), functools.lru_cache, Memcached |
Reduce latency and DB load |
| Database | SQLAlchemy, Django ORM, Tortoise ORM | Relational data access |
| Search | Elasticsearch (elasticsearch-py) |
Full-text search |
| Monitoring | Prometheus (prometheus-client), Sentry, OpenTelemetry |
Observability |
Microservices architecture with Python:
- Service decomposition — break monoliths into domain-bounded services (user service, order service, notification service)
- API Gateway — use Kong, Traefik, or a custom FastAPI gateway to route, authenticate, and rate-limit requests
- Inter-service communication — synchronous (HTTP/gRPC) vs asynchronous (message queues). Prefer async for non-blocking workflows
- Service discovery — Consul, etcd, or Kubernetes DNS for locating service instances dynamically
Caching strategies:
- Cache-aside (lazy loading) — application checks cache first, populates on miss
- Write-through — write to cache and DB simultaneously
- Write-behind — write to cache immediately, flush to DB asynchronously
- TTL-based expiry — set expiration to prevent stale data
Load balancing and horizontal scaling:
- Run multiple instances of a Python service behind Nginx or HAProxy
- Use Gunicorn with multiple workers (
gunicorn -w 4) or Uvicorn for async FastAPI - Stateless services scale horizontally — store session data in Redis, not in-process memory
Python in distributed systems leverages asyncio for high-concurrency I/O-bound services, Celery for distributed task processing, and gRPC (grpcio) for efficient inter-service communication with Protocol Buffers.
💻 Code Example
1# ============================================================2# System Design Components in Python3# ============================================================4import asyncio5import hashlib6import json7import time8from collections import OrderedDict9from dataclasses import dataclass, field10from typing import Any, Optional11from enum import Enum121314# --- Component 1: LRU Cache with TTL (Cache-Aside Pattern) ---15class TTLCache:16 """Thread-safe LRU cache with per-key TTL expiration."""1718 def __init__(self, capacity=1000, default_ttl=300):19 self.capacity = capacity20 self.default_ttl = default_ttl21 self._cache = OrderedDict() # key -> (value, expire_at)2223 def get(self, key):24 """Cache-aside read: returns value or None if miss/expired."""25 if key not in self._cache:26 return None27 value, expire_at = self._cache[key]28 if time.time() > expire_at:29 del self._cache[key] # expired30 return None31 # Move to end (most recently used)32 self._cache.move_to_end(key)33 return value3435 def set(self, key, value, ttl=None):36 """Set key with optional custom TTL."""37 ttl = ttl or self.default_ttl38 if key in self._cache:39 self._cache.move_to_end(key)40 self._cache[key] = (value, time.time() + ttl)41 if len(self._cache) > self.capacity:42 self._cache.popitem(last=False) # evict LRU4344 def invalidate(self, key):45 """Explicit cache invalidation."""46 self._cache.pop(key, None)4748 def stats(self):49 """Return cache statistics."""50 now = time.time()51 active = sum(1 for _, (_, exp) in self._cache.items() if exp > now)52 return {"total_keys": len(self._cache), "active_keys": active}535455# --- Component 2: Consistent Hashing (Load Balancing) ---56class ConsistentHashRing:57 """Consistent hashing for distributing keys across server nodes."""5859 def __init__(self, replicas=150):60 self.replicas = replicas61 self.ring = {} # hash -> node name62 self.sorted_keys = []6364 def _hash(self, key):65 return int(hashlib.md5(key.encode()).hexdigest(), 16)6667 def add_node(self, node):68 """Add a server node with virtual replicas."""69 for i in range(self.replicas):70 virtual_key = f"{node}:replica-{i}"71 h = self._hash(virtual_key)72 self.ring[h] = node73 self.sorted_keys.append(h)74 self.sorted_keys.sort()7576 def remove_node(self, node):77 """Remove a node and all its replicas."""78 for i in range(self.replicas):79 virtual_key = f"{node}:replica-{i}"80 h = self._hash(virtual_key)81 self.ring.pop(h, None)82 if h in self.sorted_keys:83 self.sorted_keys.remove(h)8485 def get_node(self, key):86 """Find which node a key maps to."""87 if not self.ring:88 return None89 h = self._hash(key)90 for ring_key in self.sorted_keys:91 if h <= ring_key:92 return self.ring[ring_key]93 return self.ring[self.sorted_keys[0]] # wrap around949596# --- Component 3: Rate Limiter (Token Bucket) ---97class TokenBucketRateLimiter:98 """Per-client rate limiting using the token bucket algorithm."""99100 def __init__(self, capacity=10, refill_rate=1.0):101 self.capacity = capacity102 self.refill_rate = refill_rate # tokens per second103 self.buckets = {} # client_id -> (tokens, last_refill)104105 def allow_request(self, client_id):106 """Check if a request from client_id should be allowed."""107 now = time.time()108 if client_id not in self.buckets:109 self.buckets[client_id] = (self.capacity - 1, now)110 return True111112 tokens, last_refill = self.buckets[client_id]113 # Refill tokens based on elapsed time114 elapsed = now - last_refill115 tokens = min(self.capacity, tokens + elapsed * self.refill_rate)116117 if tokens >= 1:118 self.buckets[client_id] = (tokens - 1, now)119 return True120 else:121 self.buckets[client_id] = (tokens, now)122 return False123124125# --- Component 4: Simple Message Queue (Producer/Consumer) ---126class MessageQueue:127 """In-memory message queue with topic-based pub/sub."""128129 def __init__(self):130 self.topics = {} # topic -> deque of messages131 self.subscribers = {} # topic -> list of callback functions132133 def create_topic(self, topic):134 if topic not in self.topics:135 self.topics[topic] = []136 self.subscribers[topic] = []137138 def publish(self, topic, message):139 """Publish a message to a topic."""140 if topic not in self.topics:141 self.create_topic(topic)142 envelope = {143 "id": hashlib.sha256(144 f"{topic}-{time.time()}".encode()145 ).hexdigest()[:12],146 "topic": topic,147 "payload": message,148 "timestamp": time.time(),149 }150 self.topics[topic].append(envelope)151 # Notify subscribers152 for callback in self.subscribers[topic]:153 callback(envelope)154 return envelope["id"]155156 def subscribe(self, topic, callback):157 """Register a callback for messages on a topic."""158 if topic not in self.subscribers:159 self.create_topic(topic)160 self.subscribers[topic].append(callback)161162 def consume(self, topic, count=1):163 """Pull messages from a topic (destructive read)."""164 if topic not in self.topics:165 return []166 messages = self.topics[topic][:count]167 self.topics[topic] = self.topics[topic][count:]168 return messages169170171# --- Component 5: Circuit Breaker Pattern ---172class CircuitState(Enum):173 CLOSED = "closed" # normal operation174 OPEN = "open" # failing, reject requests175 HALF_OPEN = "half_open" # testing if service recovered176177178class CircuitBreaker:179 """Prevent cascading failures in microservice communication."""180181 def __init__(self, failure_threshold=5, recovery_timeout=30):182 self.failure_threshold = failure_threshold183 self.recovery_timeout = recovery_timeout184 self.state = CircuitState.CLOSED185 self.failure_count = 0186 self.last_failure_time = 0187188 def call(self, func, *args, **kwargs):189 """Execute function through the circuit breaker."""190 if self.state == CircuitState.OPEN:191 if time.time() - self.last_failure_time > self.recovery_timeout:192 self.state = CircuitState.HALF_OPEN193 else:194 raise Exception("Circuit breaker is OPEN — request rejected")195196 try:197 result = func(*args, **kwargs)198 self._on_success()199 return result200 except Exception as e:201 self._on_failure()202 raise e203204 def _on_success(self):205 self.failure_count = 0206 self.state = CircuitState.CLOSED207208 def _on_failure(self):209 self.failure_count += 1210 self.last_failure_time = time.time()211 if self.failure_count >= self.failure_threshold:212 self.state = CircuitState.OPEN213214215# --- Demo ---216if __name__ == "__main__":217 # Cache demo218 cache = TTLCache(capacity=3, default_ttl=10)219 cache.set("user:1", {"name": "Alice"})220 print(cache.get("user:1")) # {'name': 'Alice'}221 print(cache.stats())222223 # Consistent hashing demo224 ring = ConsistentHashRing()225 for node in ["server-a", "server-b", "server-c"]:226 ring.add_node(node)227 for key in ["user:100", "user:200", "order:500"]:228 print(f"{key} -> {ring.get_node(key)}")229230 # Rate limiter demo231 limiter = TokenBucketRateLimiter(capacity=3, refill_rate=1.0)232 for i in range(5):233 print(f"Request {i+1}: {limiter.allow_request('client-1')}")
🏋️ Practice Exercise
Exercises:
Design a URL shortener service: implement the core logic with a
URLShortenerclass that generates short codes (base62 encoding), stores mappings in a dict (simulating Redis), handles collisions, and tracks click analytics. Include methods forshorten(url),resolve(short_code), andget_stats(short_code).Extend the
TTLCacheto support write-through and write-behind strategies. For write-through, accept apersist_fncallback that writes to a simulated database on everyset(). For write-behind, batch writes and flush every N seconds using a background thread.Implement a distributed task queue simulator: create
TaskProducerandTaskWorkerclasses. Producers enqueue tasks with priorities. Workers pull from the queue using theConsistentHashRingto assign tasks to specific workers. Add retry logic with exponential backoff for failed tasks.Build a rate limiter middleware for a FastAPI application using the sliding window log algorithm. Store request timestamps per client IP in a dict. Return HTTP 429 when the limit is exceeded. Write tests that simulate burst traffic.
Design a notification system: implement
NotificationServicethat supports email, SMS, and push channels. Use the message queue pattern with topic-based routing. Add the circuit breaker pattern for each external provider so one failing provider does not block others.
⚠️ Common Mistakes
Designing everything as synchronous request-response. Many operations (sending emails, processing images, generating reports) should be offloaded to background task queues like Celery to keep API response times fast.
Ignoring caching — hitting the database on every request for data that rarely changes. Apply cache-aside with TTL for frequently read, rarely written data like user profiles, product catalogs, or configuration.
Using Python's Global Interpreter Lock (GIL) as an excuse to avoid concurrency. The GIL only affects CPU-bound threads. For I/O-bound work (HTTP calls, DB queries, file reads),
asyncioor multi-threading provides significant speedups.Not considering failure modes in distributed systems. Every network call can fail — implement retries with exponential backoff, circuit breakers for downstream services, and timeouts on all external requests.
Storing session state in application memory, which breaks horizontal scaling. Use Redis or a database for shared state so any instance can handle any request.
💼 Interview Questions
🎤 Mock Interview
Practice a live interview for System Design with Python