Asyncio (async/await)
📖 Concept
asyncio is Python's built-in framework for writing concurrent I/O-bound code using a single-threaded event loop. It uses async/await syntax to write asynchronous code that reads like synchronous code while handling thousands of concurrent operations efficiently.
Core concepts:
- Coroutine — A function defined with
async def. Calling it returns a coroutine object (not a result). Must be awaited or scheduled. - Event loop — The central scheduler that runs coroutines, handles I/O callbacks, and manages timers. Only one loop per thread.
await— Suspends the current coroutine and yields control back to the event loop until the awaited operation completes.- Task — A wrapper around a coroutine that schedules it for concurrent execution. Created with
asyncio.create_task(). asyncio.gather()— Run multiple coroutines concurrently and collect all results.asyncio.TaskGroup(3.11+) — Structured concurrency: a context manager that runs tasks and cancels all on first failure.
How it works (simplified):
1. Event loop picks a ready coroutine
2. Coroutine runs until it hits 'await' (I/O, sleep, etc.)
3. Coroutine is suspended, control returns to event loop
4. Event loop picks the next ready coroutine
5. When I/O completes, the suspended coroutine becomes ready again
asyncio vs threading:
| Feature | asyncio | threading |
|---|---|---|
| Concurrency model | Cooperative (explicit await) |
Preemptive (OS switches) |
| Context switches | At await points (predictable) |
Anytime (unpredictable) |
| Shared state safety | Safe (single-threaded) | Requires locks |
| Overhead per task | Very low (~KB) | Higher (~MB per thread) |
| Best for | Many concurrent I/O ops (10K+) | Moderate I/O concurrency, CPU-bound in C extensions |
When to use asyncio: High-concurrency I/O scenarios — web servers, API gateways, chat servers, web scrapers, microservice communication. When you need to handle thousands of simultaneous connections with minimal resources.
Key libraries: aiohttp (HTTP client/server), asyncpg (PostgreSQL), aioredis (Redis), aiomysql (MySQL), websockets, httpx (async HTTP).
💻 Code Example
1# ============================================================2# Basics: coroutines, await, and asyncio.run()3# ============================================================4import asyncio5import time6from typing import Any789async def fetch_data(url: str, delay: float) -> dict[str, Any]:10 """Simulate an async HTTP request."""11 print(f"Fetching {url}...")12 await asyncio.sleep(delay) # Non-blocking sleep (simulates I/O)13 return {"url": url, "status": 200, "data": f"Response from {url}"}141516async def main_sequential() -> None:17 """Sequential execution: each await blocks before the next starts."""18 start = time.perf_counter()1920 r1 = await fetch_data("https://api.example.com/users", 1.0)21 r2 = await fetch_data("https://api.example.com/posts", 1.5)22 r3 = await fetch_data("https://api.example.com/comments", 0.8)2324 elapsed = time.perf_counter() - start25 print(f"Sequential: {elapsed:.2f}s") # ~3.3s (sum of delays)262728async def main_concurrent() -> None:29 """Concurrent execution: all requests run simultaneously."""30 start = time.perf_counter()3132 results = await asyncio.gather(33 fetch_data("https://api.example.com/users", 1.0),34 fetch_data("https://api.example.com/posts", 1.5),35 fetch_data("https://api.example.com/comments", 0.8),36 )3738 elapsed = time.perf_counter() - start39 print(f"Concurrent: {elapsed:.2f}s") # ~1.5s (max of delays)40 for r in results:41 print(f" {r['url']}: {r['status']}")424344asyncio.run(main_sequential())45asyncio.run(main_concurrent())464748# ============================================================49# Tasks: create_task for fire-and-forget concurrency50# ============================================================51async def download_file(name: str, size_mb: float) -> str:52 """Simulate downloading a file."""53 chunks = int(size_mb * 10)54 for i in range(chunks):55 await asyncio.sleep(0.05) # Simulate chunk download56 return f"{name} ({size_mb}MB)"575859async def main_tasks() -> None:60 """Create tasks for concurrent execution."""61 # create_task schedules coroutines immediately62 task1 = asyncio.create_task(download_file("report.pdf", 2.0))63 task2 = asyncio.create_task(download_file("data.csv", 1.5))64 task3 = asyncio.create_task(download_file("image.png", 0.5))6566 # Do other work while downloads proceed67 print("Downloads started, doing other work...")68 await asyncio.sleep(0.1)6970 # Await results when needed71 results = await asyncio.gather(task1, task2, task3)72 for r in results:73 print(f" Downloaded: {r}")747576asyncio.run(main_tasks())777879# ============================================================80# TaskGroup for structured concurrency (Python 3.11+)81# ============================================================82async def process_item(item_id: int) -> dict:83 """Process a single item asynchronously."""84 await asyncio.sleep(0.1 * (item_id % 5))85 if item_id == 7:86 raise ValueError(f"Item {item_id} is invalid")87 return {"id": item_id, "processed": True}888990async def main_taskgroup() -> None:91 """TaskGroup cancels all tasks if any one fails."""92 try:93 async with asyncio.TaskGroup() as tg:94 tasks = [95 tg.create_task(process_item(i))96 for i in range(10)97 ]98 # If we get here, all tasks succeeded99 results = [t.result() for t in tasks]100 print(f"All {len(results)} items processed")101 except* ValueError as eg:102 # ExceptionGroup from TaskGroup (3.11+)103 for exc in eg.exceptions:104 print(f"Failed: {exc}")105106107# asyncio.run(main_taskgroup())108109110# ============================================================111# Semaphore for limiting concurrency112# ============================================================113async def rate_limited_fetch(114 sem: asyncio.Semaphore,115 url: str,116 session_id: int,117) -> dict:118 """Fetch with concurrency limit via semaphore."""119 async with sem: # At most N concurrent fetches120 print(f"[{session_id}] Fetching {url}")121 await asyncio.sleep(0.5) # Simulate request122 return {"url": url, "session": session_id, "status": 200}123124125async def main_rate_limited() -> None:126 """Fetch 20 URLs with at most 5 concurrent requests."""127 sem = asyncio.Semaphore(5)128 urls = [f"https://api.example.com/item/{i}" for i in range(20)]129130 tasks = [131 rate_limited_fetch(sem, url, i) for i, url in enumerate(urls)132 ]133134 start = time.perf_counter()135 results = await asyncio.gather(*tasks)136 elapsed = time.perf_counter() - start137138 print(f"Fetched {len(results)} URLs in {elapsed:.2f}s")139 # 20 URLs, 5 at a time, 0.5s each = ~2.0s140141142asyncio.run(main_rate_limited())143144145# ============================================================146# Async generators and async iteration147# ============================================================148async def async_range(start: int, stop: int, delay: float = 0.1):149 """Async generator that yields numbers with a delay."""150 for i in range(start, stop):151 await asyncio.sleep(delay)152 yield i153154155async def main_async_iter() -> None:156 """Consume async generators with 'async for'."""157 total = 0158 async for num in async_range(1, 11, delay=0.05):159 total += num160 print(f" Received: {num}")161 print(f"Total: {total}")162163164asyncio.run(main_async_iter())165166167# ============================================================168# Producer-consumer with asyncio.Queue169# ============================================================170async def producer(queue: asyncio.Queue, name: str, count: int) -> None:171 """Produce items and put them on the queue."""172 for i in range(count):173 item = f"{name}-item-{i}"174 await queue.put(item)175 print(f"Produced: {item}")176 await asyncio.sleep(0.05)177 await queue.put(None) # Sentinel178179180async def consumer(queue: asyncio.Queue, name: str) -> list[str]:181 """Consume items from the queue until sentinel."""182 processed = []183 while True:184 item = await queue.get()185 if item is None:186 queue.task_done()187 break188 print(f"{name} consumed: {item}")189 await asyncio.sleep(0.1) # Simulate processing190 processed.append(item)191 queue.task_done()192 return processed193194195async def main_queue() -> None:196 """Async producer-consumer pipeline."""197 queue: asyncio.Queue[str | None] = asyncio.Queue(maxsize=10)198199 prod_task = asyncio.create_task(producer(queue, "P1", 8))200 cons_task = asyncio.create_task(consumer(queue, "C1"))201202 await prod_task203 results = await cons_task204 print(f"Consumed {len(results)} items")205206207asyncio.run(main_queue())208209210# ============================================================211# Timeouts and cancellation212# ============================================================213async def slow_operation() -> str:214 """Operation that takes too long."""215 await asyncio.sleep(10)216 return "done"217218219async def main_timeout() -> None:220 """Demonstrate timeout handling."""221 # Using asyncio.wait_for222 try:223 result = await asyncio.wait_for(slow_operation(), timeout=2.0)224 except asyncio.TimeoutError:225 print("Operation timed out after 2s")226227 # Using asyncio.timeout (Python 3.11+)228 try:229 async with asyncio.timeout(1.5):230 await slow_operation()231 except TimeoutError:232 print("Timeout context manager triggered after 1.5s")233234235asyncio.run(main_timeout())236237238# ============================================================239# Running blocking code in executor240# ============================================================241import hashlib242243244def cpu_bound_hash(data: bytes) -> str:245 """CPU-bound: compute many hash rounds (blocking)."""246 result = data247 for _ in range(100):248 result = hashlib.sha256(result).digest()249 return result.hex()250251252async def main_executor() -> None:253 """Run blocking code without freezing the event loop."""254 loop = asyncio.get_running_loop()255256 # Run in thread pool (default executor)257 result = await loop.run_in_executor(258 None, # Use default ThreadPoolExecutor259 cpu_bound_hash,260 b"secret data",261 )262 print(f"Hash: {result[:32]}...")263264265asyncio.run(main_executor())
🏋️ Practice Exercise
Exercises:
Write an async function that fetches 50 URLs concurrently using
asyncio.gather(), but limits concurrency to 10 at a time usingasyncio.Semaphore. Return the results in the original order. Compare total time vs sequential execution.Implement an async producer-consumer system with 3 producers and 2 consumers sharing an
asyncio.Queue(maxsize=20). Producers generate random data, consumers process it. Use sentinel values for graceful shutdown. Track total throughput.Build an async retry decorator:
@async_retry(max_attempts=3, backoff=1.0, exceptions=(ConnectionError,))that retries anasync deffunction with exponential backoff. Test it with a function that fails randomly.Create an async web crawler that starts from a seed URL, follows links up to depth 3, and collects page titles. Use
asyncio.Semaphoreto limit concurrent requests to 5. Useasyncio.TaskGroupfor structured concurrency. Detect and skip already-visited URLs.Write an async context manager
async_timerthat measures the wall-clock time of anasync withblock. Use it to benchmarkgather()vs sequential awaits for 20 simulated API calls with random latencies.Implement a simple async event bus:
subscribe(event_name, callback),publish(event_name, data). Callbacks are async functions. Publishing should run all subscribers concurrently and collect results. Add timeout handling per subscriber.
⚠️ Common Mistakes
Calling a coroutine function without
awaitand wondering why nothing happens.fetch_data()returns a coroutine object, not a result. You mustawait fetch_data()or schedule it withcreate_task(). Python will emit 'coroutine was never awaited' warning.Using
time.sleep()instead ofawait asyncio.sleep()inside async code.time.sleep()blocks the entire event loop, freezing ALL coroutines. Always use async equivalents:aiohttpinstead ofrequests,asyncpginstead ofpsycopg2, etc.Creating tasks with
asyncio.create_task()but never awaiting or storing the reference. Unawaited tasks can be garbage-collected before completion, silently dropping their work and any exceptions they raise.Trying to call
asyncio.run()from inside an already-running event loop (e.g., in Jupyter or some web frameworks). Useawaitdirectly orloop.create_task()instead. In Jupyter, useawait main()directly since the notebook already runs an event loop.Not handling
asyncio.CancelledErrorproperly. When a task is cancelled (e.g., via timeout),CancelledErroris raised at theawaitpoint. If you catchExceptionbroadly, you accidentally swallow cancellation. Always re-raiseCancelledErroror useexcept Exceptionwith care (in Python 3.9+,CancelledErrorinherits fromBaseException, notException).
💼 Interview Questions
🎤 Mock Interview
Practice a live interview for Asyncio (async/await)