concurrent.futures
📖 Concept
The concurrent.futures module provides a high-level, unified interface for executing callables asynchronously using either threads or processes. It abstracts away the complexity of managing threads/processes directly and provides a clean Executor / Future API.
Why use concurrent.futures:
- Simpler than raw threading/multiprocessing — no manual thread creation, joining, or queue management
- Unified API — swap between
ThreadPoolExecutorandProcessPoolExecutorby changing one line - Future objects — represent pending results, support callbacks, cancellation, and exception retrieval
- Context manager — executors are context managers that handle shutdown automatically
Key components:
| Component | Purpose |
|---|---|
ThreadPoolExecutor |
Pool of threads for I/O-bound work |
ProcessPoolExecutor |
Pool of processes for CPU-bound work |
Future |
Represents a pending computation result |
submit(fn, *args) |
Submit a single callable, returns a Future |
map(fn, *iterables) |
Apply function to iterables in parallel (like built-in map) |
as_completed(futures) |
Iterator yielding futures as they complete (not in submission order) |
wait(futures) |
Block until futures complete, with options for first-completed or all-completed |
submit() vs map():
submit()returns aFuturefor each call — gives you fine-grained control (callbacks, cancellation, exception handling per task)map()returns an iterator of results in input order — simpler API but less control. Raises the first exception encountered.
as_completed() is one of the most useful patterns: it yields futures in the order they finish, not the order they were submitted. This is ideal for processing results as soon as they're available, showing progress, or implementing timeouts.
Future lifecycle:
PENDING → RUNNING → FINISHED (result or exception) or CANCELLED
The module integrates seamlessly with asyncio via loop.run_in_executor(), which wraps a concurrent.futures executor for use in async code. This is the standard pattern for running blocking or CPU-bound code inside an asyncio application.
💻 Code Example
1# ============================================================2# ThreadPoolExecutor for I/O-bound work3# ============================================================4from concurrent.futures import (5 ThreadPoolExecutor,6 ProcessPoolExecutor,7 as_completed,8 wait,9 Future,10 FIRST_COMPLETED,11 ALL_COMPLETED,12)13import time14import hashlib15import math16import logging17from typing import Any18from pathlib import Path1920logging.basicConfig(level=logging.INFO)21logger = logging.getLogger(__name__)222324def fetch_url(url: str) -> dict[str, Any]:25 """Simulate fetching a URL (I/O-bound)."""26 time.sleep(0.3 + len(url) % 5 * 0.1) # Variable latency27 return {"url": url, "status": 200, "size": len(url) * 100}282930# --- Using submit() with as_completed() ---31def fetch_all_urls(urls: list[str], max_workers: int = 10) -> list[dict]:32 """Fetch URLs concurrently, processing results as they arrive."""33 results = []34 with ThreadPoolExecutor(max_workers=max_workers) as executor:35 # Submit all tasks and map futures to their URLs36 future_to_url = {37 executor.submit(fetch_url, url): url for url in urls38 }3940 # Process results as they complete (NOT in submission order)41 for future in as_completed(future_to_url):42 url = future_to_url[future]43 try:44 result = future.result() # Get the return value45 results.append(result)46 logger.info(f"Completed: {url} ({result['size']} bytes)")47 except Exception as exc:48 logger.error(f"Failed: {url} - {exc}")4950 return results515253urls = [f"https://api.example.com/page/{i}" for i in range(15)]54start = time.perf_counter()55results = fetch_all_urls(urls, max_workers=5)56elapsed = time.perf_counter() - start57print(f"Fetched {len(results)} URLs in {elapsed:.2f}s")585960# --- Using map() for simpler parallel mapping ---61def process_record(record: dict) -> dict:62 """Transform a data record (I/O-bound simulation)."""63 time.sleep(0.1)64 return {**record, "processed": True, "hash": hashlib.md5(65 str(record).encode()66 ).hexdigest()[:8]}676869records = [{"id": i, "value": i * 10} for i in range(20)]7071with ThreadPoolExecutor(max_workers=8) as executor:72 # map() returns results in INPUT order (unlike as_completed)73 start = time.perf_counter()74 processed = list(executor.map(process_record, records))75 elapsed = time.perf_counter() - start7677print(f"Processed {len(processed)} records in {elapsed:.2f}s")787980# ============================================================81# ProcessPoolExecutor for CPU-bound work82# ============================================================83def compute_primes(start: int, end: int) -> list[int]:84 """Find all primes in range [start, end) -- CPU-bound."""85 primes = []86 for n in range(max(2, start), end):87 if all(n % d != 0 for d in range(2, int(math.sqrt(n)) + 1)):88 primes.append(n)89 return primes909192def parallel_prime_count(limit: int, num_workers: int = 4) -> int:93 """Count primes up to limit using multiple processes."""94 chunk_size = limit // num_workers95 ranges = [96 (i * chunk_size, (i + 1) * chunk_size)97 for i in range(num_workers)98 ]99 # Ensure we cover the full range100 ranges[-1] = (ranges[-1][0], limit)101102 with ProcessPoolExecutor(max_workers=num_workers) as executor:103 futures = [104 executor.submit(compute_primes, start, end)105 for start, end in ranges106 ]107108 total_primes = []109 for future in as_completed(futures):110 chunk_primes = future.result()111 total_primes.extend(chunk_primes)112 print(f"Chunk done: found {len(chunk_primes)} primes")113114 return len(total_primes)115116117if __name__ == "__main__":118 start = time.perf_counter()119 count = parallel_prime_count(500_000, num_workers=4)120 elapsed = time.perf_counter() - start121 print(f"Found {count} primes in {elapsed:.2f}s")122123124# ============================================================125# Future callbacks and exception handling126# ============================================================127def risky_operation(task_id: int) -> str:128 """Operation that may fail."""129 time.sleep(0.2)130 if task_id % 3 == 0:131 raise ValueError(f"Task {task_id} failed: bad input")132 return f"Task {task_id} succeeded"133134135def on_complete(future: Future) -> None:136 """Callback invoked when a future completes."""137 if future.exception():138 logger.error(f"Callback: error - {future.exception()}")139 else:140 logger.info(f"Callback: {future.result()}")141142143with ThreadPoolExecutor(max_workers=4) as executor:144 futures = []145 for i in range(10):146 future = executor.submit(risky_operation, i)147 future.add_done_callback(on_complete) # Non-blocking notification148 futures.append(future)149150 # Wait for all to complete and handle exceptions151 for future in as_completed(futures):152 try:153 result = future.result(timeout=5.0)154 except ValueError as e:155 print(f"Handled error: {e}")156 except TimeoutError:157 print("Task timed out")158159160# ============================================================161# wait() for first-completed pattern162# ============================================================163def search_engine(query: str, engine: str) -> dict:164 """Simulate searching different engines with varying speeds."""165 delays = {"google": 0.3, "bing": 0.5, "duckduckgo": 0.8}166 time.sleep(delays.get(engine, 1.0))167 return {"engine": engine, "query": query, "results": 42}168169170def search_first_result(query: str) -> dict:171 """Return result from whichever search engine responds first."""172 engines = ["google", "bing", "duckduckgo"]173174 with ThreadPoolExecutor(max_workers=len(engines)) as executor:175 futures = {176 executor.submit(search_engine, query, eng): eng177 for eng in engines178 }179180 # Wait for FIRST completed future181 done, not_done = wait(futures, return_when=FIRST_COMPLETED)182183 # Get the first result184 first_future = done.pop()185 result = first_future.result()186 engine = futures[first_future]187 print(f"First result from {engine}")188189 # Cancel remaining futures (best effort)190 for f in not_done:191 f.cancel()192193 return result194195196if __name__ == "__main__":197 result = search_first_result("python concurrency")198 print(f"Result: {result}")199200201# ============================================================202# Switching between Thread and Process executors203# ============================================================204def get_executor(task_type: str, max_workers: int = 4):205 """Factory: choose executor based on workload type."""206 if task_type == "io":207 return ThreadPoolExecutor(max_workers=max_workers)208 elif task_type == "cpu":209 return ProcessPoolExecutor(max_workers=max_workers)210 else:211 raise ValueError(f"Unknown task type: {task_type}")212213214def generic_parallel_map(func, items, task_type="io", max_workers=4):215 """Run func over items in parallel, choosing executor by task type."""216 with get_executor(task_type, max_workers) as executor:217 results = list(executor.map(func, items))218 return results219220221# ============================================================222# Integration with asyncio223# ============================================================224import asyncio225226227def blocking_io_operation(filepath: str) -> str:228 """Blocking file read (cannot be made async natively)."""229 time.sleep(0.1) # Simulate slow I/O230 return f"Contents of {filepath}"231232233async def async_main() -> None:234 """Use run_in_executor to call blocking code from async."""235 loop = asyncio.get_running_loop()236237 # Run blocking functions concurrently in thread pool238 with ThreadPoolExecutor(max_workers=4) as pool:239 tasks = [240 loop.run_in_executor(pool, blocking_io_operation, f"file_{i}.txt")241 for i in range(10)242 ]243 results = await asyncio.gather(*tasks)244245 for r in results:246 print(f" {r}")247248249if __name__ == "__main__":250 asyncio.run(async_main())
🏋️ Practice Exercise
Exercises:
Write a parallel file downloader using
ThreadPoolExecutor. Given a list of 30 URLs, download them withmax_workers=8. Useas_completed()to print a progress bar ([=====> ] 15/30). Handle individual download failures without crashing other downloads.Implement a parallel image processor using
ProcessPoolExecutor: given 20 image file paths, apply a CPU-bound transformation (e.g., convert to grayscale by averaging RGB values). Usemap()and compare execution time vs single-process. Plot speedup vs number of workers.Build a "fastest mirror" selector: submit the same download request to 5 mirror URLs concurrently using
submit(). Usewait(return_when=FIRST_COMPLETED)to get the fastest response. Cancel remaining tasks. Add retry logic if the first result is an error.Create a generic
parallel_batch_processor(items, func, batch_size, max_workers, executor_type)that splits items into batches, processes each batch in parallel, collects results withas_completed(), and supports both thread and process executors. Add timeout per batch.Write an
async def process_files(paths)that usesloop.run_in_executor()with aThreadPoolExecutorto read files concurrently inside an asyncio application. Compare performance against purely synchronous file reading for 100 small files and 10 large files.Implement a task scheduler that accepts jobs with priorities. Use
ThreadPoolExecutor.submit()andFuturecallbacks to chain dependent tasks (task B runs only after task A succeeds). Add cancellation support: cancelling a parent task cancels all dependent children.
⚠️ Common Mistakes
Not handling exceptions from
Future.result(). If the submitted function raises, callingfuture.result()re-raises the exception. Without try/except, one failed task crashes the consumer loop. Always wrapresult()calls in exception handling.Using
ProcessPoolExecutorfor I/O-bound work. Process creation and IPC overhead makes it much slower thanThreadPoolExecutorfor network or file I/O. Reserve process pools for CPU-bound computation.Calling
future.result()immediately aftersubmit(), which blocks the caller until that specific task completes. This defeats the purpose of concurrency. Useas_completed()to process results as they arrive, orgather()in async code.Creating a new executor for every function call instead of reusing one. Executor creation has overhead (spawning threads/processes). Create one executor and reuse it, or use it as a context manager at the appropriate scope.
Forgetting that
executor.map()raises the first exception and silently abandons remaining items. If you need all results (including partial failures), usesubmit()+as_completed()with per-future exception handling instead.
💼 Interview Questions
🎤 Mock Interview
Practice a live interview for concurrent.futures