Context Managers
📖 Concept
Context managers ensure that resources are properly acquired and released, regardless of whether an operation succeeds or fails. The with statement guarantees cleanup code runs, even if exceptions occur — no more forgotten file.close() calls or unreleased locks.
The context manager protocol:
with expression as variable:
# __enter__() is called, return value bound to variable
body
# __exit__() is ALWAYS called (even on exception)
Two ways to create context managers:
| Approach | Best for | Complexity |
|---|---|---|
Class-based (__enter__/__exit__) |
Complex state, reusable managers | More boilerplate |
@contextmanager (generator-based) |
Simple setup/teardown, quick one-offs | Less code |
__exit__ parameters: When an exception occurs inside the with block, __exit__ receives the exception type, value, and traceback. Returning True from __exit__ suppresses the exception; returning False (or None) lets it propagate.
Nested context managers can be combined with contextlib.ExitStack for dynamic or variable-length resource management:
with ExitStack() as stack:
files = [stack.enter_context(open(f)) for f in file_list]
Async context managers use __aenter__/__aexit__ and are entered with async with. The @asynccontextmanager decorator from contextlib simplifies creation. They're essential for managing async resources like database connections, HTTP sessions, and network sockets in asyncio code.
💻 Code Example
1# ============================================================2# Class-based context manager3# ============================================================4import time5import contextlib6import logging7import os8import tempfile9from typing import Optional1011logger = logging.getLogger(__name__)121314class Timer:15 """Context manager that measures execution time."""1617 def __init__(self, label: str = "Block"):18 self.label = label19 self.elapsed: Optional[float] = None2021 def __enter__(self):22 self._start = time.perf_counter()23 return self # Bound to 'as' variable2425 def __exit__(self, exc_type, exc_val, exc_tb):26 self.elapsed = time.perf_counter() - self._start27 logger.info(f"{self.label} took {self.elapsed:.4f}s")28 return False # Don't suppress exceptions293031with Timer("Data processing") as t:32 total = sum(range(1_000_000))33print(f"Elapsed: {t.elapsed:.4f}s")343536# ============================================================37# Context manager with exception handling38# ============================================================39class DatabaseTransaction:40 """Manage database transactions with automatic commit/rollback."""4142 def __init__(self, connection):43 self.connection = connection4445 def __enter__(self):46 self.connection.begin()47 return self.connection4849 def __exit__(self, exc_type, exc_val, exc_tb):50 if exc_type is not None:51 # Exception occurred — rollback52 self.connection.rollback()53 logger.error(f"Transaction rolled back: {exc_val}")54 return False # Re-raise the exception55 else:56 # No exception — commit57 self.connection.commit()58 return False596061# Usage:62# with DatabaseTransaction(conn) as db:63# db.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))64# db.execute("UPDATE accounts SET balance = balance - 100 WHERE id = 1")65# Automatically commits on success, rolls back on exception666768# ============================================================69# @contextmanager — generator-based (simpler)70# ============================================================71@contextlib.contextmanager72def temporary_directory(prefix: str = "tmp_"):73 """Create a temp directory, yield it, then clean up."""74 path = tempfile.mkdtemp(prefix=prefix)75 logger.info(f"Created temp dir: {path}")76 try:77 yield path # Everything before yield = __enter__78 finally:79 # Everything after yield = __exit__ (always runs)80 import shutil81 shutil.rmtree(path, ignore_errors=True)82 logger.info(f"Cleaned up temp dir: {path}")838485with temporary_directory("myapp_") as tmpdir:86 # Work with temporary directory87 filepath = os.path.join(tmpdir, "data.txt")88 with open(filepath, "w") as f:89 f.write("temporary data")90# Directory is automatically deleted here919293# ============================================================94# @contextmanager with exception handling95# ============================================================96@contextlib.contextmanager97def managed_resource(name: str):98 """Acquire and release a named resource."""99 print(f"Acquiring {name}")100 resource = {"name": name, "active": True}101 try:102 yield resource103 except Exception as e:104 print(f"Error in {name}: {e}")105 resource["error"] = str(e)106 raise # Re-raise after logging107 finally:108 resource["active"] = False109 print(f"Released {name}")110111112# ============================================================113# ExitStack for dynamic context management114# ============================================================115def process_multiple_files(file_paths: list[str]):116 """Open and process a variable number of files."""117 with contextlib.ExitStack() as stack:118 # Dynamically enter multiple context managers119 files = [120 stack.enter_context(open(path, "r"))121 for path in file_paths122 ]123 # All files are open here; ALL will be closed on exit124 for f in files:125 print(f.readline())126127128# ============================================================129# Suppressing specific exceptions130# ============================================================131# Instead of try/except/pass:132with contextlib.suppress(FileNotFoundError):133 os.remove("nonexistent_file.txt")134 # No error even though file doesn't exist135136137# ============================================================138# Reentrant and reusable context managers139# ============================================================140@contextlib.contextmanager141def indent_logger(level: int = 1):142 """Context manager that indents log output."""143 prefix = " " * level144 original_factory = logging.getLogRecordFactory()145146 def indented_factory(*args, **kwargs):147 record = original_factory(*args, **kwargs)148 record.msg = f"{prefix}{record.msg}"149 return record150151 logging.setLogRecordFactory(indented_factory)152 try:153 yield154 finally:155 logging.setLogRecordFactory(original_factory)156157158# ============================================================159# Async context manager160# ============================================================161import asyncio162163164class AsyncDatabasePool:165 """Async context manager for database connection pools."""166167 def __init__(self, dsn: str, min_size: int = 2, max_size: int = 10):168 self.dsn = dsn169 self.min_size = min_size170 self.max_size = max_size171 self.pool = None172173 async def __aenter__(self):174 # Simulate async pool creation175 print(f"Creating pool for {self.dsn}")176 await asyncio.sleep(0.1) # Simulate connection time177 self.pool = {"dsn": self.dsn, "connections": self.min_size}178 return self.pool179180 async def __aexit__(self, exc_type, exc_val, exc_tb):181 print(f"Closing pool for {self.dsn}")182 await asyncio.sleep(0.05) # Simulate graceful shutdown183 self.pool = None184 return False185186187@contextlib.asynccontextmanager188async def async_http_session(base_url: str):189 """Async context manager for HTTP session lifecycle."""190 session = {"base_url": base_url, "active": True}191 print(f"Opening session to {base_url}")192 try:193 yield session194 finally:195 session["active"] = False196 print(f"Closed session to {base_url}")197198199# Usage:200# async def main():201# async with AsyncDatabasePool("postgresql://...") as pool:202# async with async_http_session("https://api.example.com") as http:203# # Both resources managed204# pass
🏋️ Practice Exercise
Exercises:
Write a class-based context manager
FileBackupthat copies a file to a.bakbefore entering the block, and restores it from backup if an exception occurs during the block. Delete the backup on successful exit.Implement
@contextmanager-basedchange_directory(path)that changes the working directory on entry and restores the original directory on exit, even if an exception occurs.Build an
ExitStack-based function that opens N database connections, N file handles, and N network sockets (simulated), processes data from all of them, and ensures all resources are released on exit.Create a context manager
timeout(seconds)usingsignal.alarm(Unix) that raisesTimeoutErrorif the block takes too long. Handle the cleanup properly in__exit__.Write an async context manager
rate_limiter(max_concurrent)usingasyncio.Semaphorethat limits how many coroutines can execute the body concurrently. Test it with 20 simulated API calls limited to 5 concurrent.
⚠️ Common Mistakes
Forgetting to return
False(or not returning at all) from__exit__and accidentally suppressing exceptions. Only returnTrueif you intentionally want to swallow the exception. ReturningNone(implicit) correctly propagates exceptions.Using
@contextmanagerbut forgetting thetry/finallyaroundyield. Withoutfinally, the cleanup code afteryieldwon't run if an exception occurs inside thewithblock — defeating the entire purpose of the context manager.Yielding more than once in a
@contextmanagergenerator. The generator must yield exactly once — the yield separates setup from teardown. Multiple yields causeRuntimeError.Not understanding that
__exit__is called even when__enter__assigns to a variable that's never used. Theasclause is optional; the context manager's lifecycle (enter/exit) runs regardless.Creating context managers that hold resources indefinitely. If a context manager acquires a database connection in
__enter__, holding thewithblock open for a long time (e.g., waiting for user input) starves the connection pool. Keepwithblocks short.
💼 Interview Questions
🎤 Mock Interview
Practice a live interview for Context Managers