Multiprocessing

0/4 in this phase0/54 across the roadmap

📖 Concept

The multiprocessing module spawns separate OS processes, each with its own Python interpreter and GIL. This is Python's primary mechanism for achieving true CPU parallelism on multi-core machines.

Why multiprocessing over threading for CPU work: Since each process has its own GIL, multiple processes can execute Python bytecode truly in parallel across CPU cores. The trade-off is higher overhead: process creation is slower than thread creation, and inter-process communication requires serialization (pickling).

Key components:

Component Purpose
Process Spawn a single child process
Pool Manage a pool of worker processes
Queue Thread/process-safe FIFO queue for IPC
Pipe Two-way communication channel between two processes
Value / Array Shared memory for simple types (ctypes-backed)
Manager Proxy-based shared objects (dict, list, Namespace) across processes
Lock / Semaphore Process-level synchronization primitives

Process start methods:

  • fork (default on Unix) — Fast, copies parent's memory via copy-on-write. Unsafe with threads in parent.
  • spawn (default on Windows/macOS since 3.8) — Starts a fresh interpreter, imports the module. Slower but safer. Requires if __name__ == "__main__": guard.
  • forkserver — Hybrid: a server process forks workers. Safer than fork with threads.

When to use multiprocessing vs threading:

  • CPU-bound (math, image processing, ML inference, compression): multiprocessing
  • I/O-bound (network, disk, database): threading or asyncio
  • Mixed: use a process pool for CPU work, with threading or asyncio within each process for I/O

Shared memory (Python 3.8+): multiprocessing.shared_memory.SharedMemory provides a block of memory accessible by all processes without pickling. Ideal for large arrays, NumPy buffers, or any scenario where serialization overhead is prohibitive.

💻 Code Example

codeTap to expand ⛶
1# ============================================================
2# Basic Process creation
3# ============================================================
4import multiprocessing as mp
5import os
6import time
7import math
8from typing import Any
9
10
11def cpu_intensive_task(n: int) -> float:
12 """Simulate CPU-bound work: compute sum of square roots."""
13 return sum(math.sqrt(i) for i in range(n))
14
15
16def worker(task_id: int, n: int, result_queue: mp.Queue) -> None:
17 """Worker that puts its result on a queue."""
18 pid = os.getpid()
19 print(f"Worker {task_id} started (PID={pid})")
20 result = cpu_intensive_task(n)
21 result_queue.put((task_id, result))
22 print(f"Worker {task_id} finished (PID={pid})")
23
24
25if __name__ == "__main__":
26 # Required guard for 'spawn' start method (default on macOS/Windows)
27
28 result_queue = mp.Queue()
29 processes = []
30
31 start = time.perf_counter()
32 for i in range(4):
33 p = mp.Process(target=worker, args=(i, 5_000_000, result_queue))
34 processes.append(p)
35 p.start()
36
37 for p in processes:
38 p.join()
39
40 results = [result_queue.get() for _ in range(4)]
41 elapsed = time.perf_counter() - start
42
43 for task_id, result in sorted(results):
44 print(f"Task {task_id}: {result:.2f}")
45 print(f"Total time: {elapsed:.2f}s (4 processes)")
46
47
48# ============================================================
49# Pool for parallel map/starmap
50# ============================================================
51def process_chunk(data: list[int]) -> list[int]:
52 """CPU-bound processing of a data chunk."""
53 return [x * x + math.isqrt(x) for x in data]
54
55
56if __name__ == "__main__":
57 data = list(range(10_000_000))
58 chunk_size = len(data) // mp.cpu_count()
59 chunks = [
60 data[i : i + chunk_size]
61 for i in range(0, len(data), chunk_size)
62 ]
63
64 # Pool.map distributes chunks across worker processes
65 start = time.perf_counter()
66 with mp.Pool(processes=mp.cpu_count()) as pool:
67 results = pool.map(process_chunk, chunks)
68 elapsed = time.perf_counter() - start
69
70 total_items = sum(len(r) for r in results)
71 print(f"Processed {total_items:,} items in {elapsed:.2f}s")
72 print(f"Using {mp.cpu_count()} cores")
73
74
75# ============================================================
76# Pool with imap_unordered for streaming results
77# ============================================================
78def analyze_file(filepath: str) -> dict[str, Any]:
79 """Analyze a single file (CPU-bound)."""
80 # Simulated file analysis
81 time.sleep(0.1)
82 return {
83 "file": filepath,
84 "size": len(filepath) * 100,
85 "checksum": hash(filepath) % 10**8,
86 }
87
88
89if __name__ == "__main__":
90 filepaths = [f"/data/file_{i:04d}.csv" for i in range(50)]
91
92 with mp.Pool(processes=8) as pool:
93 # imap_unordered yields results as they complete (not in order)
94 for result in pool.imap_unordered(analyze_file, filepaths, chunksize=5):
95 print(f"Completed: {result['file']} ({result['size']} bytes)")
96
97
98# ============================================================
99# Shared memory with Value and Array
100# ============================================================
101def counter_worker(
102 shared_counter: mp.Value,
103 lock: mp.Lock,
104 increments: int,
105) -> None:
106 """Increment a shared counter safely."""
107 for _ in range(increments):
108 with lock:
109 shared_counter.value += 1
110
111
112if __name__ == "__main__":
113 counter = mp.Value("i", 0) # 'i' = signed int, initial value 0
114 lock = mp.Lock()
115
116 procs = [
117 mp.Process(target=counter_worker, args=(counter, lock, 100_000))
118 for _ in range(4)
119 ]
120 for p in procs:
121 p.start()
122 for p in procs:
123 p.join()
124
125 print(f"Shared counter: {counter.value}") # Always 400_000
126
127
128# ============================================================
129# SharedMemory for large data (Python 3.8+)
130# ============================================================
131from multiprocessing import shared_memory
132import struct
133
134
135def create_shared_buffer(name: str, data: list[float]) -> None:
136 """Create a shared memory block with float data."""
137 fmt = f"{len(data)}d" # 'd' = double (float64)
138 size = struct.calcsize(fmt)
139
140 shm = shared_memory.SharedMemory(name=name, create=True, size=size)
141 struct.pack_into(fmt, shm.buf, 0, *data)
142 print(f"Created shared memory '{name}' ({size} bytes)")
143 shm.close() # Close local handle (memory persists)
144
145
146def read_shared_buffer(name: str, count: int) -> list[float]:
147 """Read float data from existing shared memory."""
148 shm = shared_memory.SharedMemory(name=name, create=False)
149 fmt = f"{count}d"
150 data = list(struct.unpack_from(fmt, shm.buf, 0))
151 shm.close()
152 return data
153
154
155if __name__ == "__main__":
156 BUFFER_NAME = "my_shared_data"
157 sample = [1.1, 2.2, 3.3, 4.4, 5.5]
158
159 create_shared_buffer(BUFFER_NAME, sample)
160
161 result = read_shared_buffer(BUFFER_NAME, len(sample))
162 print(f"Read from shared memory: {result}")
163
164 # Cleanup: unlink removes the shared memory block
165 shm = shared_memory.SharedMemory(name=BUFFER_NAME, create=False)
166 shm.close()
167 shm.unlink()
168
169
170# ============================================================
171# Pipe for two-way communication
172# ============================================================
173def pipe_worker(conn: mp.connection.Connection) -> None:
174 """Worker process that communicates via Pipe."""
175 while True:
176 msg = conn.recv()
177 if msg == "STOP":
178 conn.send("GOODBYE")
179 break
180 conn.send(f"Processed: {msg.upper()}")
181 conn.close()
182
183
184if __name__ == "__main__":
185 parent_conn, child_conn = mp.Pipe()
186
187 p = mp.Process(target=pipe_worker, args=(child_conn,))
188 p.start()
189
190 for word in ["hello", "world", "python"]:
191 parent_conn.send(word)
192 print(parent_conn.recv())
193
194 parent_conn.send("STOP")
195 print(parent_conn.recv()) # "GOODBYE"
196 p.join()

🏋️ Practice Exercise

Exercises:

  1. Write a script that computes the sum of prime numbers up to 10,000,000 using multiprocessing.Pool. Split the range into chunks (one per CPU core), compute primes in each chunk in parallel, and combine results. Compare the speedup against a single-process version.

  2. Implement a parallel file hasher: given a list of 100 file paths, compute their SHA-256 hashes using a Pool with imap_unordered. Display a progress bar (using a counter and total). Handle FileNotFoundError gracefully per file without crashing the pool.

  3. Build a multi-process pipeline using Queue: Process A reads lines from a large CSV, Process B parses and transforms rows, Process C writes results to a new file. Use sentinel values to signal completion. Measure throughput.

  4. Create a shared NumPy array using multiprocessing.shared_memory and have 4 worker processes each write to their own slice. Verify the combined result in the parent process without any pickling overhead.

  5. Write a benchmark comparing fork vs spawn start methods. Measure process creation time, memory usage, and total execution time for a pool of 8 workers. Explain the results.

  6. Implement a parallel web scraper: a main process puts URLs into a Queue, 4 worker processes fetch and parse pages (using requests + BeautifulSoup), and put parsed results into an output Queue. The main process collects and saves results. Add graceful shutdown with a poison-pill pattern.

⚠️ Common Mistakes

  • Forgetting the if __name__ == '__main__': guard. On spawn start method (default on macOS/Windows), the child process re-imports the module. Without the guard, it tries to create new processes recursively, causing RuntimeError or infinite process spawning.

  • Passing unpicklable objects (lambdas, open file handles, database connections, sockets) to worker processes. Everything sent via Queue, Pipe, or Pool must be picklable. Use module-level functions instead of lambdas, and create resources inside the worker.

  • Creating too many processes. Spawning 1,000 processes on an 8-core machine wastes memory and causes excessive context switching. Match the pool size to os.cpu_count() for CPU-bound work, or slightly above for mixed workloads.

  • Using multiprocessing for I/O-bound tasks. The process creation and IPC overhead far exceeds any benefit. Use threading or asyncio for I/O-bound work — reserve multiprocessing for CPU-bound computation.

  • Not properly cleaning up shared memory. SharedMemory blocks persist after the process exits. Failing to call shm.unlink() leaves orphaned memory blocks in the OS. Always unlink from exactly one process (typically the creator) using try/finally.

💼 Interview Questions

🎤 Mock Interview

Practice a live interview for Multiprocessing