Asyncio (async/await)

0/4 in this phase0/54 across the roadmap

📖 Concept

asyncio is Python's built-in framework for writing concurrent I/O-bound code using a single-threaded event loop. It uses async/await syntax to write asynchronous code that reads like synchronous code while handling thousands of concurrent operations efficiently.

Core concepts:

  • Coroutine — A function defined with async def. Calling it returns a coroutine object (not a result). Must be awaited or scheduled.
  • Event loop — The central scheduler that runs coroutines, handles I/O callbacks, and manages timers. Only one loop per thread.
  • await — Suspends the current coroutine and yields control back to the event loop until the awaited operation completes.
  • Task — A wrapper around a coroutine that schedules it for concurrent execution. Created with asyncio.create_task().
  • asyncio.gather() — Run multiple coroutines concurrently and collect all results.
  • asyncio.TaskGroup (3.11+) — Structured concurrency: a context manager that runs tasks and cancels all on first failure.

How it works (simplified):

1. Event loop picks a ready coroutine
2. Coroutine runs until it hits 'await' (I/O, sleep, etc.)
3. Coroutine is suspended, control returns to event loop
4. Event loop picks the next ready coroutine
5. When I/O completes, the suspended coroutine becomes ready again

asyncio vs threading:

Feature asyncio threading
Concurrency model Cooperative (explicit await) Preemptive (OS switches)
Context switches At await points (predictable) Anytime (unpredictable)
Shared state safety Safe (single-threaded) Requires locks
Overhead per task Very low (~KB) Higher (~MB per thread)
Best for Many concurrent I/O ops (10K+) Moderate I/O concurrency, CPU-bound in C extensions

When to use asyncio: High-concurrency I/O scenarios — web servers, API gateways, chat servers, web scrapers, microservice communication. When you need to handle thousands of simultaneous connections with minimal resources.

Key libraries: aiohttp (HTTP client/server), asyncpg (PostgreSQL), aioredis (Redis), aiomysql (MySQL), websockets, httpx (async HTTP).

💻 Code Example

codeTap to expand ⛶
1# ============================================================
2# Basics: coroutines, await, and asyncio.run()
3# ============================================================
4import asyncio
5import time
6from typing import Any
7
8
9async def fetch_data(url: str, delay: float) -> dict[str, Any]:
10 """Simulate an async HTTP request."""
11 print(f"Fetching {url}...")
12 await asyncio.sleep(delay) # Non-blocking sleep (simulates I/O)
13 return {"url": url, "status": 200, "data": f"Response from {url}"}
14
15
16async def main_sequential() -> None:
17 """Sequential execution: each await blocks before the next starts."""
18 start = time.perf_counter()
19
20 r1 = await fetch_data("https://api.example.com/users", 1.0)
21 r2 = await fetch_data("https://api.example.com/posts", 1.5)
22 r3 = await fetch_data("https://api.example.com/comments", 0.8)
23
24 elapsed = time.perf_counter() - start
25 print(f"Sequential: {elapsed:.2f}s") # ~3.3s (sum of delays)
26
27
28async def main_concurrent() -> None:
29 """Concurrent execution: all requests run simultaneously."""
30 start = time.perf_counter()
31
32 results = await asyncio.gather(
33 fetch_data("https://api.example.com/users", 1.0),
34 fetch_data("https://api.example.com/posts", 1.5),
35 fetch_data("https://api.example.com/comments", 0.8),
36 )
37
38 elapsed = time.perf_counter() - start
39 print(f"Concurrent: {elapsed:.2f}s") # ~1.5s (max of delays)
40 for r in results:
41 print(f" {r['url']}: {r['status']}")
42
43
44asyncio.run(main_sequential())
45asyncio.run(main_concurrent())
46
47
48# ============================================================
49# Tasks: create_task for fire-and-forget concurrency
50# ============================================================
51async def download_file(name: str, size_mb: float) -> str:
52 """Simulate downloading a file."""
53 chunks = int(size_mb * 10)
54 for i in range(chunks):
55 await asyncio.sleep(0.05) # Simulate chunk download
56 return f"{name} ({size_mb}MB)"
57
58
59async def main_tasks() -> None:
60 """Create tasks for concurrent execution."""
61 # create_task schedules coroutines immediately
62 task1 = asyncio.create_task(download_file("report.pdf", 2.0))
63 task2 = asyncio.create_task(download_file("data.csv", 1.5))
64 task3 = asyncio.create_task(download_file("image.png", 0.5))
65
66 # Do other work while downloads proceed
67 print("Downloads started, doing other work...")
68 await asyncio.sleep(0.1)
69
70 # Await results when needed
71 results = await asyncio.gather(task1, task2, task3)
72 for r in results:
73 print(f" Downloaded: {r}")
74
75
76asyncio.run(main_tasks())
77
78
79# ============================================================
80# TaskGroup for structured concurrency (Python 3.11+)
81# ============================================================
82async def process_item(item_id: int) -> dict:
83 """Process a single item asynchronously."""
84 await asyncio.sleep(0.1 * (item_id % 5))
85 if item_id == 7:
86 raise ValueError(f"Item {item_id} is invalid")
87 return {"id": item_id, "processed": True}
88
89
90async def main_taskgroup() -> None:
91 """TaskGroup cancels all tasks if any one fails."""
92 try:
93 async with asyncio.TaskGroup() as tg:
94 tasks = [
95 tg.create_task(process_item(i))
96 for i in range(10)
97 ]
98 # If we get here, all tasks succeeded
99 results = [t.result() for t in tasks]
100 print(f"All {len(results)} items processed")
101 except* ValueError as eg:
102 # ExceptionGroup from TaskGroup (3.11+)
103 for exc in eg.exceptions:
104 print(f"Failed: {exc}")
105
106
107# asyncio.run(main_taskgroup())
108
109
110# ============================================================
111# Semaphore for limiting concurrency
112# ============================================================
113async def rate_limited_fetch(
114 sem: asyncio.Semaphore,
115 url: str,
116 session_id: int,
117) -> dict:
118 """Fetch with concurrency limit via semaphore."""
119 async with sem: # At most N concurrent fetches
120 print(f"[{session_id}] Fetching {url}")
121 await asyncio.sleep(0.5) # Simulate request
122 return {"url": url, "session": session_id, "status": 200}
123
124
125async def main_rate_limited() -> None:
126 """Fetch 20 URLs with at most 5 concurrent requests."""
127 sem = asyncio.Semaphore(5)
128 urls = [f"https://api.example.com/item/{i}" for i in range(20)]
129
130 tasks = [
131 rate_limited_fetch(sem, url, i) for i, url in enumerate(urls)
132 ]
133
134 start = time.perf_counter()
135 results = await asyncio.gather(*tasks)
136 elapsed = time.perf_counter() - start
137
138 print(f"Fetched {len(results)} URLs in {elapsed:.2f}s")
139 # 20 URLs, 5 at a time, 0.5s each = ~2.0s
140
141
142asyncio.run(main_rate_limited())
143
144
145# ============================================================
146# Async generators and async iteration
147# ============================================================
148async def async_range(start: int, stop: int, delay: float = 0.1):
149 """Async generator that yields numbers with a delay."""
150 for i in range(start, stop):
151 await asyncio.sleep(delay)
152 yield i
153
154
155async def main_async_iter() -> None:
156 """Consume async generators with 'async for'."""
157 total = 0
158 async for num in async_range(1, 11, delay=0.05):
159 total += num
160 print(f" Received: {num}")
161 print(f"Total: {total}")
162
163
164asyncio.run(main_async_iter())
165
166
167# ============================================================
168# Producer-consumer with asyncio.Queue
169# ============================================================
170async def producer(queue: asyncio.Queue, name: str, count: int) -> None:
171 """Produce items and put them on the queue."""
172 for i in range(count):
173 item = f"{name}-item-{i}"
174 await queue.put(item)
175 print(f"Produced: {item}")
176 await asyncio.sleep(0.05)
177 await queue.put(None) # Sentinel
178
179
180async def consumer(queue: asyncio.Queue, name: str) -> list[str]:
181 """Consume items from the queue until sentinel."""
182 processed = []
183 while True:
184 item = await queue.get()
185 if item is None:
186 queue.task_done()
187 break
188 print(f"{name} consumed: {item}")
189 await asyncio.sleep(0.1) # Simulate processing
190 processed.append(item)
191 queue.task_done()
192 return processed
193
194
195async def main_queue() -> None:
196 """Async producer-consumer pipeline."""
197 queue: asyncio.Queue[str | None] = asyncio.Queue(maxsize=10)
198
199 prod_task = asyncio.create_task(producer(queue, "P1", 8))
200 cons_task = asyncio.create_task(consumer(queue, "C1"))
201
202 await prod_task
203 results = await cons_task
204 print(f"Consumed {len(results)} items")
205
206
207asyncio.run(main_queue())
208
209
210# ============================================================
211# Timeouts and cancellation
212# ============================================================
213async def slow_operation() -> str:
214 """Operation that takes too long."""
215 await asyncio.sleep(10)
216 return "done"
217
218
219async def main_timeout() -> None:
220 """Demonstrate timeout handling."""
221 # Using asyncio.wait_for
222 try:
223 result = await asyncio.wait_for(slow_operation(), timeout=2.0)
224 except asyncio.TimeoutError:
225 print("Operation timed out after 2s")
226
227 # Using asyncio.timeout (Python 3.11+)
228 try:
229 async with asyncio.timeout(1.5):
230 await slow_operation()
231 except TimeoutError:
232 print("Timeout context manager triggered after 1.5s")
233
234
235asyncio.run(main_timeout())
236
237
238# ============================================================
239# Running blocking code in executor
240# ============================================================
241import hashlib
242
243
244def cpu_bound_hash(data: bytes) -> str:
245 """CPU-bound: compute many hash rounds (blocking)."""
246 result = data
247 for _ in range(100):
248 result = hashlib.sha256(result).digest()
249 return result.hex()
250
251
252async def main_executor() -> None:
253 """Run blocking code without freezing the event loop."""
254 loop = asyncio.get_running_loop()
255
256 # Run in thread pool (default executor)
257 result = await loop.run_in_executor(
258 None, # Use default ThreadPoolExecutor
259 cpu_bound_hash,
260 b"secret data",
261 )
262 print(f"Hash: {result[:32]}...")
263
264
265asyncio.run(main_executor())

🏋️ Practice Exercise

Exercises:

  1. Write an async function that fetches 50 URLs concurrently using asyncio.gather(), but limits concurrency to 10 at a time using asyncio.Semaphore. Return the results in the original order. Compare total time vs sequential execution.

  2. Implement an async producer-consumer system with 3 producers and 2 consumers sharing an asyncio.Queue(maxsize=20). Producers generate random data, consumers process it. Use sentinel values for graceful shutdown. Track total throughput.

  3. Build an async retry decorator: @async_retry(max_attempts=3, backoff=1.0, exceptions=(ConnectionError,)) that retries an async def function with exponential backoff. Test it with a function that fails randomly.

  4. Create an async web crawler that starts from a seed URL, follows links up to depth 3, and collects page titles. Use asyncio.Semaphore to limit concurrent requests to 5. Use asyncio.TaskGroup for structured concurrency. Detect and skip already-visited URLs.

  5. Write an async context manager async_timer that measures the wall-clock time of an async with block. Use it to benchmark gather() vs sequential awaits for 20 simulated API calls with random latencies.

  6. Implement a simple async event bus: subscribe(event_name, callback), publish(event_name, data). Callbacks are async functions. Publishing should run all subscribers concurrently and collect results. Add timeout handling per subscriber.

⚠️ Common Mistakes

  • Calling a coroutine function without await and wondering why nothing happens. fetch_data() returns a coroutine object, not a result. You must await fetch_data() or schedule it with create_task(). Python will emit 'coroutine was never awaited' warning.

  • Using time.sleep() instead of await asyncio.sleep() inside async code. time.sleep() blocks the entire event loop, freezing ALL coroutines. Always use async equivalents: aiohttp instead of requests, asyncpg instead of psycopg2, etc.

  • Creating tasks with asyncio.create_task() but never awaiting or storing the reference. Unawaited tasks can be garbage-collected before completion, silently dropping their work and any exceptions they raise.

  • Trying to call asyncio.run() from inside an already-running event loop (e.g., in Jupyter or some web frameworks). Use await directly or loop.create_task() instead. In Jupyter, use await main() directly since the notebook already runs an event loop.

  • Not handling asyncio.CancelledError properly. When a task is cancelled (e.g., via timeout), CancelledError is raised at the await point. If you catch Exception broadly, you accidentally swallow cancellation. Always re-raise CancelledError or use except Exception with care (in Python 3.9+, CancelledError inherits from BaseException, not Exception).

💼 Interview Questions

🎤 Mock Interview

Practice a live interview for Asyncio (async/await)