concurrent.futures

0/4 in this phase0/54 across the roadmap

📖 Concept

The concurrent.futures module provides a high-level, unified interface for executing callables asynchronously using either threads or processes. It abstracts away the complexity of managing threads/processes directly and provides a clean Executor / Future API.

Why use concurrent.futures:

  • Simpler than raw threading/multiprocessing — no manual thread creation, joining, or queue management
  • Unified API — swap between ThreadPoolExecutor and ProcessPoolExecutor by changing one line
  • Future objects — represent pending results, support callbacks, cancellation, and exception retrieval
  • Context manager — executors are context managers that handle shutdown automatically

Key components:

Component Purpose
ThreadPoolExecutor Pool of threads for I/O-bound work
ProcessPoolExecutor Pool of processes for CPU-bound work
Future Represents a pending computation result
submit(fn, *args) Submit a single callable, returns a Future
map(fn, *iterables) Apply function to iterables in parallel (like built-in map)
as_completed(futures) Iterator yielding futures as they complete (not in submission order)
wait(futures) Block until futures complete, with options for first-completed or all-completed

submit() vs map():

  • submit() returns a Future for each call — gives you fine-grained control (callbacks, cancellation, exception handling per task)
  • map() returns an iterator of results in input order — simpler API but less control. Raises the first exception encountered.

as_completed() is one of the most useful patterns: it yields futures in the order they finish, not the order they were submitted. This is ideal for processing results as soon as they're available, showing progress, or implementing timeouts.

Future lifecycle: PENDINGRUNNINGFINISHED (result or exception) or CANCELLED

The module integrates seamlessly with asyncio via loop.run_in_executor(), which wraps a concurrent.futures executor for use in async code. This is the standard pattern for running blocking or CPU-bound code inside an asyncio application.

💻 Code Example

codeTap to expand ⛶
1# ============================================================
2# ThreadPoolExecutor for I/O-bound work
3# ============================================================
4from concurrent.futures import (
5 ThreadPoolExecutor,
6 ProcessPoolExecutor,
7 as_completed,
8 wait,
9 Future,
10 FIRST_COMPLETED,
11 ALL_COMPLETED,
12)
13import time
14import hashlib
15import math
16import logging
17from typing import Any
18from pathlib import Path
19
20logging.basicConfig(level=logging.INFO)
21logger = logging.getLogger(__name__)
22
23
24def fetch_url(url: str) -> dict[str, Any]:
25 """Simulate fetching a URL (I/O-bound)."""
26 time.sleep(0.3 + len(url) % 5 * 0.1) # Variable latency
27 return {"url": url, "status": 200, "size": len(url) * 100}
28
29
30# --- Using submit() with as_completed() ---
31def fetch_all_urls(urls: list[str], max_workers: int = 10) -> list[dict]:
32 """Fetch URLs concurrently, processing results as they arrive."""
33 results = []
34 with ThreadPoolExecutor(max_workers=max_workers) as executor:
35 # Submit all tasks and map futures to their URLs
36 future_to_url = {
37 executor.submit(fetch_url, url): url for url in urls
38 }
39
40 # Process results as they complete (NOT in submission order)
41 for future in as_completed(future_to_url):
42 url = future_to_url[future]
43 try:
44 result = future.result() # Get the return value
45 results.append(result)
46 logger.info(f"Completed: {url} ({result['size']} bytes)")
47 except Exception as exc:
48 logger.error(f"Failed: {url} - {exc}")
49
50 return results
51
52
53urls = [f"https://api.example.com/page/{i}" for i in range(15)]
54start = time.perf_counter()
55results = fetch_all_urls(urls, max_workers=5)
56elapsed = time.perf_counter() - start
57print(f"Fetched {len(results)} URLs in {elapsed:.2f}s")
58
59
60# --- Using map() for simpler parallel mapping ---
61def process_record(record: dict) -> dict:
62 """Transform a data record (I/O-bound simulation)."""
63 time.sleep(0.1)
64 return {**record, "processed": True, "hash": hashlib.md5(
65 str(record).encode()
66 ).hexdigest()[:8]}
67
68
69records = [{"id": i, "value": i * 10} for i in range(20)]
70
71with ThreadPoolExecutor(max_workers=8) as executor:
72 # map() returns results in INPUT order (unlike as_completed)
73 start = time.perf_counter()
74 processed = list(executor.map(process_record, records))
75 elapsed = time.perf_counter() - start
76
77print(f"Processed {len(processed)} records in {elapsed:.2f}s")
78
79
80# ============================================================
81# ProcessPoolExecutor for CPU-bound work
82# ============================================================
83def compute_primes(start: int, end: int) -> list[int]:
84 """Find all primes in range [start, end) -- CPU-bound."""
85 primes = []
86 for n in range(max(2, start), end):
87 if all(n % d != 0 for d in range(2, int(math.sqrt(n)) + 1)):
88 primes.append(n)
89 return primes
90
91
92def parallel_prime_count(limit: int, num_workers: int = 4) -> int:
93 """Count primes up to limit using multiple processes."""
94 chunk_size = limit // num_workers
95 ranges = [
96 (i * chunk_size, (i + 1) * chunk_size)
97 for i in range(num_workers)
98 ]
99 # Ensure we cover the full range
100 ranges[-1] = (ranges[-1][0], limit)
101
102 with ProcessPoolExecutor(max_workers=num_workers) as executor:
103 futures = [
104 executor.submit(compute_primes, start, end)
105 for start, end in ranges
106 ]
107
108 total_primes = []
109 for future in as_completed(futures):
110 chunk_primes = future.result()
111 total_primes.extend(chunk_primes)
112 print(f"Chunk done: found {len(chunk_primes)} primes")
113
114 return len(total_primes)
115
116
117if __name__ == "__main__":
118 start = time.perf_counter()
119 count = parallel_prime_count(500_000, num_workers=4)
120 elapsed = time.perf_counter() - start
121 print(f"Found {count} primes in {elapsed:.2f}s")
122
123
124# ============================================================
125# Future callbacks and exception handling
126# ============================================================
127def risky_operation(task_id: int) -> str:
128 """Operation that may fail."""
129 time.sleep(0.2)
130 if task_id % 3 == 0:
131 raise ValueError(f"Task {task_id} failed: bad input")
132 return f"Task {task_id} succeeded"
133
134
135def on_complete(future: Future) -> None:
136 """Callback invoked when a future completes."""
137 if future.exception():
138 logger.error(f"Callback: error - {future.exception()}")
139 else:
140 logger.info(f"Callback: {future.result()}")
141
142
143with ThreadPoolExecutor(max_workers=4) as executor:
144 futures = []
145 for i in range(10):
146 future = executor.submit(risky_operation, i)
147 future.add_done_callback(on_complete) # Non-blocking notification
148 futures.append(future)
149
150 # Wait for all to complete and handle exceptions
151 for future in as_completed(futures):
152 try:
153 result = future.result(timeout=5.0)
154 except ValueError as e:
155 print(f"Handled error: {e}")
156 except TimeoutError:
157 print("Task timed out")
158
159
160# ============================================================
161# wait() for first-completed pattern
162# ============================================================
163def search_engine(query: str, engine: str) -> dict:
164 """Simulate searching different engines with varying speeds."""
165 delays = {"google": 0.3, "bing": 0.5, "duckduckgo": 0.8}
166 time.sleep(delays.get(engine, 1.0))
167 return {"engine": engine, "query": query, "results": 42}
168
169
170def search_first_result(query: str) -> dict:
171 """Return result from whichever search engine responds first."""
172 engines = ["google", "bing", "duckduckgo"]
173
174 with ThreadPoolExecutor(max_workers=len(engines)) as executor:
175 futures = {
176 executor.submit(search_engine, query, eng): eng
177 for eng in engines
178 }
179
180 # Wait for FIRST completed future
181 done, not_done = wait(futures, return_when=FIRST_COMPLETED)
182
183 # Get the first result
184 first_future = done.pop()
185 result = first_future.result()
186 engine = futures[first_future]
187 print(f"First result from {engine}")
188
189 # Cancel remaining futures (best effort)
190 for f in not_done:
191 f.cancel()
192
193 return result
194
195
196if __name__ == "__main__":
197 result = search_first_result("python concurrency")
198 print(f"Result: {result}")
199
200
201# ============================================================
202# Switching between Thread and Process executors
203# ============================================================
204def get_executor(task_type: str, max_workers: int = 4):
205 """Factory: choose executor based on workload type."""
206 if task_type == "io":
207 return ThreadPoolExecutor(max_workers=max_workers)
208 elif task_type == "cpu":
209 return ProcessPoolExecutor(max_workers=max_workers)
210 else:
211 raise ValueError(f"Unknown task type: {task_type}")
212
213
214def generic_parallel_map(func, items, task_type="io", max_workers=4):
215 """Run func over items in parallel, choosing executor by task type."""
216 with get_executor(task_type, max_workers) as executor:
217 results = list(executor.map(func, items))
218 return results
219
220
221# ============================================================
222# Integration with asyncio
223# ============================================================
224import asyncio
225
226
227def blocking_io_operation(filepath: str) -> str:
228 """Blocking file read (cannot be made async natively)."""
229 time.sleep(0.1) # Simulate slow I/O
230 return f"Contents of {filepath}"
231
232
233async def async_main() -> None:
234 """Use run_in_executor to call blocking code from async."""
235 loop = asyncio.get_running_loop()
236
237 # Run blocking functions concurrently in thread pool
238 with ThreadPoolExecutor(max_workers=4) as pool:
239 tasks = [
240 loop.run_in_executor(pool, blocking_io_operation, f"file_{i}.txt")
241 for i in range(10)
242 ]
243 results = await asyncio.gather(*tasks)
244
245 for r in results:
246 print(f" {r}")
247
248
249if __name__ == "__main__":
250 asyncio.run(async_main())

🏋️ Practice Exercise

Exercises:

  1. Write a parallel file downloader using ThreadPoolExecutor. Given a list of 30 URLs, download them with max_workers=8. Use as_completed() to print a progress bar ([=====> ] 15/30). Handle individual download failures without crashing other downloads.

  2. Implement a parallel image processor using ProcessPoolExecutor: given 20 image file paths, apply a CPU-bound transformation (e.g., convert to grayscale by averaging RGB values). Use map() and compare execution time vs single-process. Plot speedup vs number of workers.

  3. Build a "fastest mirror" selector: submit the same download request to 5 mirror URLs concurrently using submit(). Use wait(return_when=FIRST_COMPLETED) to get the fastest response. Cancel remaining tasks. Add retry logic if the first result is an error.

  4. Create a generic parallel_batch_processor(items, func, batch_size, max_workers, executor_type) that splits items into batches, processes each batch in parallel, collects results with as_completed(), and supports both thread and process executors. Add timeout per batch.

  5. Write an async def process_files(paths) that uses loop.run_in_executor() with a ThreadPoolExecutor to read files concurrently inside an asyncio application. Compare performance against purely synchronous file reading for 100 small files and 10 large files.

  6. Implement a task scheduler that accepts jobs with priorities. Use ThreadPoolExecutor.submit() and Future callbacks to chain dependent tasks (task B runs only after task A succeeds). Add cancellation support: cancelling a parent task cancels all dependent children.

⚠️ Common Mistakes

  • Not handling exceptions from Future.result(). If the submitted function raises, calling future.result() re-raises the exception. Without try/except, one failed task crashes the consumer loop. Always wrap result() calls in exception handling.

  • Using ProcessPoolExecutor for I/O-bound work. Process creation and IPC overhead makes it much slower than ThreadPoolExecutor for network or file I/O. Reserve process pools for CPU-bound computation.

  • Calling future.result() immediately after submit(), which blocks the caller until that specific task completes. This defeats the purpose of concurrency. Use as_completed() to process results as they arrive, or gather() in async code.

  • Creating a new executor for every function call instead of reusing one. Executor creation has overhead (spawning threads/processes). Create one executor and reuse it, or use it as a context manager at the appropriate scope.

  • Forgetting that executor.map() raises the first exception and silently abandons remaining items. If you need all results (including partial failures), use submit() + as_completed() with per-future exception handling instead.

💼 Interview Questions

🎤 Mock Interview

Practice a live interview for concurrent.futures