itertools & functools

0/3 in this phase0/54 across the roadmap

📖 Concept

The itertools and functools modules are Python's standard library power tools for functional programming. They provide battle-tested, C-optimized implementations of common patterns that would otherwise require verbose, error-prone manual code. Mastering these modules is the difference between writing Python and writing Pythonic Python.

itertools provides functions that create memory-efficient iterators for looping. Every function in itertools returns an iterator, meaning it produces values lazily — only computing the next value when asked. This makes them suitable for processing datasets that don't fit in memory.

Category Functions Purpose
Infinite count, cycle, repeat Unbounded sequences
Terminating chain, islice, takewhile, dropwhile, filterfalse, compress, starmap, zip_longest Finite transformations
Combinatoric product, permutations, combinations, combinations_with_replacement Mathematical combinations
Grouping groupby, pairwise (3.10+), batched (3.12+) Structuring sequences

functools provides higher-order functions and operations on callable objects. While itertools operates on data sequences, functools operates on functions themselves.

Function Purpose
lru_cache / cache Memoize function results with LRU eviction
partial Fix some arguments of a function
reduce Left fold over an iterable
singledispatch Single-argument polymorphic dispatch
total_ordering Auto-generate comparison methods from __eq__ and one of __lt__/__gt__
wraps Preserve wrapped function metadata
cached_property One-time computed property (memoized on instance)

lru_cache is particularly important in production. It memoizes function results with a Least Recently Used eviction policy. With maxsize=None, it becomes an unbounded cache (equivalent to @cache in 3.9+). It stores results in a dict keyed by the function arguments, so arguments must be hashable. The cache is thread-safe for reads but not for the initial computation — two threads may compute the same value simultaneously.

singledispatch turns a function into a generic function with dispatch based on the type of the first argument. It's Python's answer to method overloading and is cleaner than long if isinstance(...) chains.

💻 Code Example

codeTap to expand ⛶
1# ============================================================
2# itertools: infinite iterators with practical boundaries
3# ============================================================
4import itertools
5import functools
6import operator
7from typing import Any, Callable, Iterable, Iterator, TypeVar
8
9T = TypeVar("T")
10
11
12# count: generate sequential IDs
13def id_generator(prefix: str = "item") -> Iterator[str]:
14 """Generate unique IDs like item_001, item_002, ..."""
15 for n in itertools.count(1):
16 yield f"{prefix}_{n:03d}"
17
18
19gen = id_generator("order")
20print([next(gen) for _ in range(3)]) # ['order_001', 'order_002', 'order_003']
21
22
23# cycle: round-robin load balancing
24def round_robin_balancer(servers: list[str]) -> Iterator[str]:
25 """Cycle through servers endlessly for load balancing."""
26 return itertools.cycle(servers)
27
28
29balancer = round_robin_balancer(["server-a", "server-b", "server-c"])
30assignments = [next(balancer) for _ in range(7)]
31print(assignments)
32# ['server-a', 'server-b', 'server-c', 'server-a', 'server-b', 'server-c', 'server-a']
33
34
35# ============================================================
36# itertools: terminating iterators for data pipelines
37# ============================================================
38
39# chain.from_iterable: flatten one level of nesting
40nested_results = [[1, 2, 3], [4, 5], [6, 7, 8, 9]]
41flat = list(itertools.chain.from_iterable(nested_results))
42print(flat) # [1, 2, 3, 4, 5, 6, 7, 8, 9]
43
44
45# takewhile / dropwhile: process until/after a condition
46log_levels = ["DEBUG", "DEBUG", "INFO", "WARNING", "ERROR", "INFO"]
47before_warning = list(itertools.takewhile(lambda x: x != "WARNING", log_levels))
48from_warning = list(itertools.dropwhile(lambda x: x != "WARNING", log_levels))
49print(before_warning) # ['DEBUG', 'DEBUG', 'INFO']
50print(from_warning) # ['WARNING', 'ERROR', 'INFO']
51
52
53# compress: filter with a selector mask
54data = ["alpha", "beta", "gamma", "delta", "epsilon"]
55mask = [True, False, True, False, True]
56selected = list(itertools.compress(data, mask))
57print(selected) # ['alpha', 'gamma', 'epsilon']
58
59
60# starmap: unpack arguments from tuples
61coordinates = [(2, 5), (3, 2), (10, 3)]
62powers = list(itertools.starmap(pow, coordinates))
63print(powers) # [32, 9, 1000]
64
65
66# zip_longest: zip with fill value for unequal lengths
67names = ["Alice", "Bob", "Charlie"]
68scores = [95, 87]
69paired = list(itertools.zip_longest(names, scores, fillvalue=0))
70print(paired) # [('Alice', 95), ('Bob', 87), ('Charlie', 0)]
71
72
73# ============================================================
74# itertools: combinatoric generators
75# ============================================================
76
77# product: cartesian product (replaces nested loops)
78sizes = ["S", "M", "L"]
79colors = ["red", "blue"]
80variants = list(itertools.product(sizes, colors))
81print(variants)
82# [('S','red'), ('S','blue'), ('M','red'), ('M','blue'), ('L','red'), ('L','blue')]
83
84# combinations: unique pairs from a team
85team = ["Alice", "Bob", "Charlie", "Diana"]
86pairs = list(itertools.combinations(team, 2))
87print(pairs)
88# [('Alice','Bob'), ('Alice','Charlie'), ('Alice','Diana'),
89# ('Bob','Charlie'), ('Bob','Diana'), ('Charlie','Diana')]
90
91
92# ============================================================
93# itertools recipes: groupby, batched, pairwise
94# ============================================================
95
96# groupby: group sorted data by key
97transactions = [
98 {"date": "2025-01-15", "amount": 100},
99 {"date": "2025-01-15", "amount": 200},
100 {"date": "2025-01-16", "amount": 150},
101 {"date": "2025-01-16", "amount": 75},
102 {"date": "2025-01-17", "amount": 300},
103]
104
105for date, group in itertools.groupby(transactions, key=lambda t: t["date"]):
106 daily_total = sum(t["amount"] for t in group)
107 print(f"{date}: ${daily_total}")
108# 2025-01-15: $300
109# 2025-01-16: $225
110# 2025-01-17: $300
111
112
113# batched (Python 3.12+) — process in chunks
114# For older Python, use the recipe:
115def batched(iterable: Iterable[T], n: int) -> Iterator[tuple[T, ...]]:
116 """Batch data into tuples of length n. Last batch may be shorter."""
117 it = iter(iterable)
118 while batch := tuple(itertools.islice(it, n)):
119 yield batch
120
121
122records = list(range(10))
123for batch in batched(records, 3):
124 print(f"Processing batch: {batch}")
125# Processing batch: (0, 1, 2)
126# Processing batch: (3, 4, 5)
127# Processing batch: (6, 7, 8)
128# Processing batch: (9,)
129
130
131# pairwise (Python 3.10+) — sliding window of size 2
132def pairwise(iterable):
133 """Return successive overlapping pairs."""
134 a, b = itertools.tee(iterable)
135 next(b, None)
136 return zip(a, b)
137
138
139temps = [68, 72, 65, 70, 74, 69]
140changes = [(b - a) for a, b in pairwise(temps)]
141print(changes) # [4, -7, 5, 4, -5]
142
143
144# ============================================================
145# functools.lru_cache: production memoization
146# ============================================================
147
148@functools.lru_cache(maxsize=256)
149def expensive_query(user_id: int, include_deleted: bool = False) -> dict:
150 """Simulate an expensive database query with caching."""
151 print(f" [CACHE MISS] Querying user {user_id}")
152 return {"id": user_id, "name": f"User_{user_id}", "deleted": include_deleted}
153
154
155# First call: cache miss
156result1 = expensive_query(42)
157# Second call: cache hit (no print)
158result2 = expensive_query(42)
159print(result1 is result2) # True (same cached object)
160
161# Inspect cache performance
162info = expensive_query.cache_info()
163print(f"Hits: {info.hits}, Misses: {info.misses}, Size: {info.currsize}")
164
165# Clear cache when data changes
166expensive_query.cache_clear()
167
168
169# lru_cache with unhashable arguments workaround
170def make_hashable(obj: Any) -> Any:
171 """Convert unhashable types to hashable equivalents."""
172 if isinstance(obj, dict):
173 return tuple(sorted((k, make_hashable(v)) for k, v in obj.items()))
174 if isinstance(obj, (list, tuple)):
175 return tuple(make_hashable(item) for item in obj)
176 if isinstance(obj, set):
177 return frozenset(make_hashable(item) for item in obj)
178 return obj
179
180
181def hashable_cache(func):
182 """Cache decorator that handles unhashable arguments."""
183 @functools.lru_cache(maxsize=128)
184 def cached(*args):
185 return func(*args)
186
187 @functools.wraps(func)
188 def wrapper(*args):
189 hashable_args = tuple(make_hashable(a) for a in args)
190 return cached(*hashable_args)
191
192 wrapper.cache_info = cached.cache_info
193 wrapper.cache_clear = cached.cache_clear
194 return wrapper
195
196
197# ============================================================
198# functools.singledispatch: type-based polymorphism
199# ============================================================
200
201@functools.singledispatch
202def serialize(obj) -> str:
203 """Serialize an object to a string representation."""
204 raise TypeError(f"Cannot serialize {type(obj).__name__}")
205
206
207@serialize.register(int)
208@serialize.register(float)
209def _(obj) -> str:
210 return str(obj)
211
212
213@serialize.register(str)
214def _(obj) -> str:
215 return f'"{obj}"'
216
217
218@serialize.register(list)
219def _(obj) -> str:
220 items = ", ".join(serialize(item) for item in obj)
221 return f"[{items}]"
222
223
224@serialize.register(dict)
225def _(obj) -> str:
226 pairs = ", ".join(f"{serialize(k)}: {serialize(v)}" for k, v in obj.items())
227 return "{" + pairs + "}"
228
229
230print(serialize(42)) # "42"
231print(serialize("hello")) # '"hello"'
232print(serialize([1, "two", 3.0])) # '[1, "two", 3.0]'
233print(serialize({"a": 1, "b": [2]})) # '{"a": 1, "b": [2]}'
234
235
236# ============================================================
237# functools.total_ordering: auto-generate comparison methods
238# ============================================================
239
240@functools.total_ordering
241class Version:
242 """Semantic version with auto-generated comparison operators."""
243
244 def __init__(self, major: int, minor: int, patch: int):
245 self.major = major
246 self.minor = minor
247 self.patch = patch
248
249 def __eq__(self, other):
250 if not isinstance(other, Version):
251 return NotImplemented
252 return (self.major, self.minor, self.patch) == (
253 other.major, other.minor, other.patch
254 )
255
256 def __lt__(self, other):
257 if not isinstance(other, Version):
258 return NotImplemented
259 return (self.major, self.minor, self.patch) < (
260 other.major, other.minor, other.patch
261 )
262
263 def __repr__(self):
264 return f"Version({self.major}.{self.minor}.{self.patch})"
265
266
267v1 = Version(2, 1, 0)
268v2 = Version(2, 3, 1)
269v3 = Version(2, 1, 0)
270
271print(v1 < v2) # True (from __lt__)
272print(v1 <= v3) # True (auto-generated)
273print(v2 > v1) # True (auto-generated)
274print(v2 >= v1) # True (auto-generated)
275print(sorted([v2, v1, Version(1, 0, 0)]))
276# [Version(1.0.0), Version(2.1.0), Version(2.3.1)]
277
278
279# ============================================================
280# functools.reduce: fold operations
281# ============================================================
282
283# Summing nested values with reduce
284data = [{"value": 10}, {"value": 25}, {"value": 15}]
285total = functools.reduce(lambda acc, d: acc + d["value"], data, 0)
286print(total) # 50
287
288# Building a nested dict path accessor
289def get_nested(data: dict, path: str, default=None):
290 """Access nested dict values with dot notation: 'a.b.c'."""
291 try:
292 return functools.reduce(operator.getitem, path.split("."), data)
293 except (KeyError, TypeError):
294 return default
295
296
297config = {"db": {"host": "localhost", "port": 5432, "pool": {"min": 2, "max": 10}}}
298print(get_nested(config, "db.host")) # "localhost"
299print(get_nested(config, "db.pool.max")) # 10
300print(get_nested(config, "db.missing", 0)) # 0

🏋️ Practice Exercise

Exercises:

  1. Using only itertools functions, write a sliding_window(iterable, n) generator that yields overlapping windows of size n. For example, sliding_window([1,2,3,4,5], 3) yields (1,2,3), (2,3,4), (3,4,5). Compare your implementation with itertools.pairwise and Python 3.12's itertools.batched.

  2. Implement a @memoize_with_ttl(seconds) decorator using functools.wraps that caches results like lru_cache but expires entries after the given TTL. Store timestamps alongside cached values. Add .cache_info() and .cache_clear() methods.

  3. Use functools.singledispatch to build a to_html(obj) function that renders Python objects as HTML: str to <p>, list to <ul><li>...</li></ul>, dict to an HTML <table>, and custom dataclasses to a <div> with labeled fields.

  4. Write a function parallel_map(func, iterables, chunk_size) that uses itertools.batched (or your own implementation) to split work into chunks, applies func to each chunk using concurrent.futures.ProcessPoolExecutor, and yields results in order.

  5. Build a CLI-style pipeline processor using itertools.chain, filterfalse, takewhile, and groupby: read a simulated log stream, filter out DEBUG lines, group by timestamp hour, and compute per-hour error counts.

  6. Implement @functools.total_ordering on a Priority class with a name and numeric priority. Then use it with heapq to build a working priority queue. Demonstrate that all six comparison operators work correctly.

⚠️ Common Mistakes

  • Passing unhashable arguments (lists, dicts, sets) to @lru_cache-decorated functions. Since the cache uses a dict internally, all arguments must be hashable. Convert lists to tuples and dicts to frozenset(d.items()) before caching, or use a custom wrapper.

  • Using itertools.groupby on unsorted data. groupby only groups consecutive elements with the same key. If your data is [A, B, A, B], groupby yields four groups, not two. Always sorted(data, key=keyfunc) first, or use collections.defaultdict for non-consecutive grouping.

  • Forgetting that functools.reduce with no initial value raises TypeError on an empty iterable. Always provide an initial value (third argument) in production code: reduce(fn, iterable, initial).

  • Using @lru_cache on methods without understanding that self is part of the cache key. Each instance creates separate cache entries, and instances are kept alive by the cache (memory leak). Use @functools.cached_property for per-instance caching of properties, or use a WeakRef-based approach for methods.

  • Consuming an itertools iterator by accidentally converting it to a list during debugging (e.g., print(list(iterator))) and then trying to use the exhausted iterator downstream. Use itertools.tee to duplicate, or restructure so the iterator is consumed only once.

💼 Interview Questions

🎤 Mock Interview

Practice a live interview for itertools & functools