System Design with Python

0/3 in this phase0/54 across the roadmap

📖 Concept

System design interviews assess your ability to architect scalable, reliable, and maintainable distributed systems. Python plays a significant role in backend services, data pipelines, and microservice architectures — understanding how to design systems with Python tooling is essential for senior-level interviews.

Key system design components and Python ecosystem:

Component Python Tools Purpose
Web Framework FastAPI, Django, Flask HTTP API layer
Task Queue Celery, Dramatiq, Huey Async background processing
Message Broker RabbitMQ (pika), Redis (redis-py), Kafka (confluent-kafka) Decoupling services
Caching Redis (redis-py), functools.lru_cache, Memcached Reduce latency and DB load
Database SQLAlchemy, Django ORM, Tortoise ORM Relational data access
Search Elasticsearch (elasticsearch-py) Full-text search
Monitoring Prometheus (prometheus-client), Sentry, OpenTelemetry Observability

Microservices architecture with Python:

  • Service decomposition — break monoliths into domain-bounded services (user service, order service, notification service)
  • API Gateway — use Kong, Traefik, or a custom FastAPI gateway to route, authenticate, and rate-limit requests
  • Inter-service communication — synchronous (HTTP/gRPC) vs asynchronous (message queues). Prefer async for non-blocking workflows
  • Service discovery — Consul, etcd, or Kubernetes DNS for locating service instances dynamically

Caching strategies:

  • Cache-aside (lazy loading) — application checks cache first, populates on miss
  • Write-through — write to cache and DB simultaneously
  • Write-behind — write to cache immediately, flush to DB asynchronously
  • TTL-based expiry — set expiration to prevent stale data

Load balancing and horizontal scaling:

  • Run multiple instances of a Python service behind Nginx or HAProxy
  • Use Gunicorn with multiple workers (gunicorn -w 4) or Uvicorn for async FastAPI
  • Stateless services scale horizontally — store session data in Redis, not in-process memory

Python in distributed systems leverages asyncio for high-concurrency I/O-bound services, Celery for distributed task processing, and gRPC (grpcio) for efficient inter-service communication with Protocol Buffers.

💻 Code Example

codeTap to expand ⛶
1# ============================================================
2# System Design Components in Python
3# ============================================================
4import asyncio
5import hashlib
6import json
7import time
8from collections import OrderedDict
9from dataclasses import dataclass, field
10from typing import Any, Optional
11from enum import Enum
12
13
14# --- Component 1: LRU Cache with TTL (Cache-Aside Pattern) ---
15class TTLCache:
16 """Thread-safe LRU cache with per-key TTL expiration."""
17
18 def __init__(self, capacity=1000, default_ttl=300):
19 self.capacity = capacity
20 self.default_ttl = default_ttl
21 self._cache = OrderedDict() # key -> (value, expire_at)
22
23 def get(self, key):
24 """Cache-aside read: returns value or None if miss/expired."""
25 if key not in self._cache:
26 return None
27 value, expire_at = self._cache[key]
28 if time.time() > expire_at:
29 del self._cache[key] # expired
30 return None
31 # Move to end (most recently used)
32 self._cache.move_to_end(key)
33 return value
34
35 def set(self, key, value, ttl=None):
36 """Set key with optional custom TTL."""
37 ttl = ttl or self.default_ttl
38 if key in self._cache:
39 self._cache.move_to_end(key)
40 self._cache[key] = (value, time.time() + ttl)
41 if len(self._cache) > self.capacity:
42 self._cache.popitem(last=False) # evict LRU
43
44 def invalidate(self, key):
45 """Explicit cache invalidation."""
46 self._cache.pop(key, None)
47
48 def stats(self):
49 """Return cache statistics."""
50 now = time.time()
51 active = sum(1 for _, (_, exp) in self._cache.items() if exp > now)
52 return {"total_keys": len(self._cache), "active_keys": active}
53
54
55# --- Component 2: Consistent Hashing (Load Balancing) ---
56class ConsistentHashRing:
57 """Consistent hashing for distributing keys across server nodes."""
58
59 def __init__(self, replicas=150):
60 self.replicas = replicas
61 self.ring = {} # hash -> node name
62 self.sorted_keys = []
63
64 def _hash(self, key):
65 return int(hashlib.md5(key.encode()).hexdigest(), 16)
66
67 def add_node(self, node):
68 """Add a server node with virtual replicas."""
69 for i in range(self.replicas):
70 virtual_key = f"{node}:replica-{i}"
71 h = self._hash(virtual_key)
72 self.ring[h] = node
73 self.sorted_keys.append(h)
74 self.sorted_keys.sort()
75
76 def remove_node(self, node):
77 """Remove a node and all its replicas."""
78 for i in range(self.replicas):
79 virtual_key = f"{node}:replica-{i}"
80 h = self._hash(virtual_key)
81 self.ring.pop(h, None)
82 if h in self.sorted_keys:
83 self.sorted_keys.remove(h)
84
85 def get_node(self, key):
86 """Find which node a key maps to."""
87 if not self.ring:
88 return None
89 h = self._hash(key)
90 for ring_key in self.sorted_keys:
91 if h <= ring_key:
92 return self.ring[ring_key]
93 return self.ring[self.sorted_keys[0]] # wrap around
94
95
96# --- Component 3: Rate Limiter (Token Bucket) ---
97class TokenBucketRateLimiter:
98 """Per-client rate limiting using the token bucket algorithm."""
99
100 def __init__(self, capacity=10, refill_rate=1.0):
101 self.capacity = capacity
102 self.refill_rate = refill_rate # tokens per second
103 self.buckets = {} # client_id -> (tokens, last_refill)
104
105 def allow_request(self, client_id):
106 """Check if a request from client_id should be allowed."""
107 now = time.time()
108 if client_id not in self.buckets:
109 self.buckets[client_id] = (self.capacity - 1, now)
110 return True
111
112 tokens, last_refill = self.buckets[client_id]
113 # Refill tokens based on elapsed time
114 elapsed = now - last_refill
115 tokens = min(self.capacity, tokens + elapsed * self.refill_rate)
116
117 if tokens >= 1:
118 self.buckets[client_id] = (tokens - 1, now)
119 return True
120 else:
121 self.buckets[client_id] = (tokens, now)
122 return False
123
124
125# --- Component 4: Simple Message Queue (Producer/Consumer) ---
126class MessageQueue:
127 """In-memory message queue with topic-based pub/sub."""
128
129 def __init__(self):
130 self.topics = {} # topic -> deque of messages
131 self.subscribers = {} # topic -> list of callback functions
132
133 def create_topic(self, topic):
134 if topic not in self.topics:
135 self.topics[topic] = []
136 self.subscribers[topic] = []
137
138 def publish(self, topic, message):
139 """Publish a message to a topic."""
140 if topic not in self.topics:
141 self.create_topic(topic)
142 envelope = {
143 "id": hashlib.sha256(
144 f"{topic}-{time.time()}".encode()
145 ).hexdigest()[:12],
146 "topic": topic,
147 "payload": message,
148 "timestamp": time.time(),
149 }
150 self.topics[topic].append(envelope)
151 # Notify subscribers
152 for callback in self.subscribers[topic]:
153 callback(envelope)
154 return envelope["id"]
155
156 def subscribe(self, topic, callback):
157 """Register a callback for messages on a topic."""
158 if topic not in self.subscribers:
159 self.create_topic(topic)
160 self.subscribers[topic].append(callback)
161
162 def consume(self, topic, count=1):
163 """Pull messages from a topic (destructive read)."""
164 if topic not in self.topics:
165 return []
166 messages = self.topics[topic][:count]
167 self.topics[topic] = self.topics[topic][count:]
168 return messages
169
170
171# --- Component 5: Circuit Breaker Pattern ---
172class CircuitState(Enum):
173 CLOSED = "closed" # normal operation
174 OPEN = "open" # failing, reject requests
175 HALF_OPEN = "half_open" # testing if service recovered
176
177
178class CircuitBreaker:
179 """Prevent cascading failures in microservice communication."""
180
181 def __init__(self, failure_threshold=5, recovery_timeout=30):
182 self.failure_threshold = failure_threshold
183 self.recovery_timeout = recovery_timeout
184 self.state = CircuitState.CLOSED
185 self.failure_count = 0
186 self.last_failure_time = 0
187
188 def call(self, func, *args, **kwargs):
189 """Execute function through the circuit breaker."""
190 if self.state == CircuitState.OPEN:
191 if time.time() - self.last_failure_time > self.recovery_timeout:
192 self.state = CircuitState.HALF_OPEN
193 else:
194 raise Exception("Circuit breaker is OPEN — request rejected")
195
196 try:
197 result = func(*args, **kwargs)
198 self._on_success()
199 return result
200 except Exception as e:
201 self._on_failure()
202 raise e
203
204 def _on_success(self):
205 self.failure_count = 0
206 self.state = CircuitState.CLOSED
207
208 def _on_failure(self):
209 self.failure_count += 1
210 self.last_failure_time = time.time()
211 if self.failure_count >= self.failure_threshold:
212 self.state = CircuitState.OPEN
213
214
215# --- Demo ---
216if __name__ == "__main__":
217 # Cache demo
218 cache = TTLCache(capacity=3, default_ttl=10)
219 cache.set("user:1", {"name": "Alice"})
220 print(cache.get("user:1")) # {'name': 'Alice'}
221 print(cache.stats())
222
223 # Consistent hashing demo
224 ring = ConsistentHashRing()
225 for node in ["server-a", "server-b", "server-c"]:
226 ring.add_node(node)
227 for key in ["user:100", "user:200", "order:500"]:
228 print(f"{key} -> {ring.get_node(key)}")
229
230 # Rate limiter demo
231 limiter = TokenBucketRateLimiter(capacity=3, refill_rate=1.0)
232 for i in range(5):
233 print(f"Request {i+1}: {limiter.allow_request('client-1')}")

🏋️ Practice Exercise

Exercises:

  1. Design a URL shortener service: implement the core logic with a URLShortener class that generates short codes (base62 encoding), stores mappings in a dict (simulating Redis), handles collisions, and tracks click analytics. Include methods for shorten(url), resolve(short_code), and get_stats(short_code).

  2. Extend the TTLCache to support write-through and write-behind strategies. For write-through, accept a persist_fn callback that writes to a simulated database on every set(). For write-behind, batch writes and flush every N seconds using a background thread.

  3. Implement a distributed task queue simulator: create TaskProducer and TaskWorker classes. Producers enqueue tasks with priorities. Workers pull from the queue using the ConsistentHashRing to assign tasks to specific workers. Add retry logic with exponential backoff for failed tasks.

  4. Build a rate limiter middleware for a FastAPI application using the sliding window log algorithm. Store request timestamps per client IP in a dict. Return HTTP 429 when the limit is exceeded. Write tests that simulate burst traffic.

  5. Design a notification system: implement NotificationService that supports email, SMS, and push channels. Use the message queue pattern with topic-based routing. Add the circuit breaker pattern for each external provider so one failing provider does not block others.

⚠️ Common Mistakes

  • Designing everything as synchronous request-response. Many operations (sending emails, processing images, generating reports) should be offloaded to background task queues like Celery to keep API response times fast.

  • Ignoring caching — hitting the database on every request for data that rarely changes. Apply cache-aside with TTL for frequently read, rarely written data like user profiles, product catalogs, or configuration.

  • Using Python's Global Interpreter Lock (GIL) as an excuse to avoid concurrency. The GIL only affects CPU-bound threads. For I/O-bound work (HTTP calls, DB queries, file reads), asyncio or multi-threading provides significant speedups.

  • Not considering failure modes in distributed systems. Every network call can fail — implement retries with exponential backoff, circuit breakers for downstream services, and timeouts on all external requests.

  • Storing session state in application memory, which breaks horizontal scaling. Use Redis or a database for shared state so any instance can handle any request.

💼 Interview Questions

🎤 Mock Interview

Practice a live interview for System Design with Python