Advanced Stream Patterns

📖 Concept

Beyond basic piping, streams unlock powerful data processing patterns for production applications: chaining, multiplexing, throttling, and async iteration.

Stream chaining (pipeline): Chain multiple Transform streams to create data processing pipelines — like Unix pipes:

readStream → decompress → decrypt → parse → filter → format → writeStream

Object mode streams: By default, streams work with Buffers/strings. Setting objectMode: true lets streams pass JavaScript objects — essential for data processing pipelines.

Async iteration (for await...of): Since Node.js 10, readable streams are async iterables:

for await (const chunk of readableStream) {
  process.stdout.write(chunk);
}

Web Streams API (Node.js 16+): Node.js now supports the Web Streams API (ReadableStream, WritableStream, TransformStream) — the same API used in browsers. This enables code sharing between Node.js and browsers.

When to use streams vs. buffers:

Scenario Use
File < 50MB readFile (simpler)
File > 50MB createReadStream (memory efficient)
Real-time data Streams (WebSocket, SSE)
ETL pipelines Transform streams
HTTP responses Stream directly (pipe to response)
Log processing Line-by-line stream reading

🏠 Real-world analogy: Advanced streams are like an assembly line in a factory — each station (Transform) does one job, and items move automatically from station to station. You can add, remove, or rearrange stations without redesigning the whole factory.

💻 Code Example

codeTap to expand ⛶
1// Advanced Stream Patterns
2
3const { Readable, Writable, Transform, pipeline } = require("stream");
4const { promisify } = require("util");
5const fs = require("fs");
6const zlib = require("zlib");
7const crypto = require("crypto");
8
9const pipelineAsync = promisify(pipeline);
10
11// 1. Custom Readable stream — Generate data on demand
12class NumberGenerator extends Readable {
13 constructor(max) {
14 super({ objectMode: true });
15 this.current = 0;
16 this.max = max;
17 }
18
19 _read() {
20 if (this.current >= this.max) {
21 this.push(null); // Signal end of stream
22 return;
23 }
24 // Simulate async data generation
25 setTimeout(() => {
26 this.push({ number: this.current++, timestamp: Date.now() });
27 }, 10);
28 }
29}
30
31// 2. Custom Writable stream — Database batch inserter
32class BatchInserter extends Writable {
33 constructor(batchSize = 100) {
34 super({ objectMode: true });
35 this.batch = [];
36 this.batchSize = batchSize;
37 this.totalInserted = 0;
38 }
39
40 async _write(record, encoding, callback) {
41 this.batch.push(record);
42
43 if (this.batch.length >= this.batchSize) {
44 await this.flushBatch();
45 }
46 callback();
47 }
48
49 async _final(callback) {
50 if (this.batch.length > 0) {
51 await this.flushBatch();
52 }
53 console.log(`Total records inserted: ${this.totalInserted}`);
54 callback();
55 }
56
57 async flushBatch() {
58 // Simulate database batch insert
59 console.log(`Inserting batch of ${this.batch.length} records...`);
60 // await db.collection("data").insertMany(this.batch);
61 this.totalInserted += this.batch.length;
62 this.batch = [];
63 }
64}
65
66// 3. Transform pipeline — ETL (Extract, Transform, Load)
67class FilterTransform extends Transform {
68 constructor(predicate) {
69 super({ objectMode: true });
70 this.predicate = predicate;
71 }
72
73 _transform(obj, encoding, callback) {
74 if (this.predicate(obj)) {
75 this.push(obj);
76 }
77 callback();
78 }
79}
80
81class MapTransform extends Transform {
82 constructor(mapper) {
83 super({ objectMode: true });
84 this.mapper = mapper;
85 }
86
87 _transform(obj, encoding, callback) {
88 this.push(this.mapper(obj));
89 callback();
90 }
91}
92
93// Usage: Generate → Filter → Transform → Insert
94async function runETLPipeline() {
95 await pipelineAsync(
96 new NumberGenerator(1000),
97 new FilterTransform((item) => item.number % 2 === 0), // Even numbers
98 new MapTransform((item) => ({
99 ...item,
100 squared: item.number ** 2,
101 label: `Item #${item.number}`,
102 })),
103 new BatchInserter(50)
104 );
105 console.log("ETL pipeline complete");
106}
107
108// 4. Async iteration with readable streams
109async function processFileByLine(filePath) {
110 const stream = fs.createReadStream(filePath, { encoding: "utf-8" });
111 let lineBuffer = "";
112 let lineCount = 0;
113
114 for await (const chunk of stream) {
115 lineBuffer += chunk;
116 const lines = lineBuffer.split("\n");
117 lineBuffer = lines.pop();
118
119 for (const line of lines) {
120 lineCount++;
121 // Process each line
122 if (line.includes("ERROR")) {
123 console.log(`Line ${lineCount}: ${line}`);
124 }
125 }
126 }
127}
128
129// 5. Multi-step file processing pipeline
130async function processLogFile(inputPath, outputPath) {
131 await pipelineAsync(
132 // Read compressed log file
133 fs.createReadStream(inputPath),
134 // Decompress
135 zlib.createGunzip(),
136 // Filter lines containing "ERROR"
137 new Transform({
138 transform(chunk, encoding, callback) {
139 const lines = chunk.toString().split("\n");
140 const errorLines = lines
141 .filter((line) => line.includes("ERROR"))
142 .join("\n");
143 if (errorLines) this.push(errorLines + "\n");
144 callback();
145 },
146 }),
147 // Write filtered output
148 fs.createWriteStream(outputPath)
149 );
150 console.log("Log processing complete");
151}
152
153// 6. Readable.from() — Create streams from iterables
154async function streamFromArray() {
155 const data = [
156 { name: "Alice", score: 95 },
157 { name: "Bob", score: 87 },
158 { name: "Charlie", score: 92 },
159 ];
160
161 const stream = Readable.from(data.map((d) => JSON.stringify(d) + "\n"));
162
163 for await (const chunk of stream) {
164 process.stdout.write(chunk);
165 }
166}
167
168// 7. Duplex stream — Echo server
169const net = require("net");
170
171function createEchoServer(port) {
172 const server = net.createServer((socket) => {
173 // socket is a Duplex stream
174 console.log("Client connected");
175
176 socket.on("data", (data) => {
177 // Echo back what was received
178 socket.write(`Echo: ${data}`);
179 });
180
181 socket.on("end", () => console.log("Client disconnected"));
182 socket.on("error", (err) => console.error("Socket error:", err.message));
183 });
184
185 server.listen(port, () => console.log(`Echo server on port ${port}`));
186 return server;
187}
188
189// 8. Stream performance monitoring
190class MonitoredTransform extends Transform {
191 constructor(name) {
192 super();
193 this.name = name;
194 this.bytesProcessed = 0;
195 this.startTime = Date.now();
196 }
197
198 _transform(chunk, encoding, callback) {
199 this.bytesProcessed += chunk.length;
200 this.push(chunk);
201 callback();
202 }
203
204 _flush(callback) {
205 const elapsed = (Date.now() - this.startTime) / 1000;
206 const mbProcessed = this.bytesProcessed / 1024 / 1024;
207 console.log(
208 `[${this.name}] Processed ${mbProcessed.toFixed(2)}MB in ${elapsed.toFixed(2)}s (${(mbProcessed / elapsed).toFixed(2)} MB/s)`
209 );
210 callback();
211 }
212}

🏋️ Practice Exercise

Exercises:

  1. Build an ETL pipeline with custom Readable, Transform, and Writable streams that processes a CSV and inserts into a database
  2. Create a streaming JSON parser that handles newline-delimited JSON (NDJSON) files
  3. Implement a rate-limiting Transform stream that limits throughput to N bytes per second
  4. Build a stream multiplexer that sends a readable stream to multiple writable destinations
  5. Use Readable.from() and async generators to create a stream from paginated API responses
  6. Write a performance-monitoring Transform stream that logs throughput (MB/s) and chunk counts

⚠️ Common Mistakes

  • Not using objectMode: true when passing JavaScript objects through streams — without it, streams expect Buffer/string and will throw or corrupt data

  • Creating custom streams without implementing _final() — this method is called before the stream closes; use it to flush remaining buffered data

  • Not destroying streams on error — use stream.destroy(err) to clean up resources; pipeline() does this automatically

  • Using readableStream.on('data') without handling backpressure — this puts the stream in 'flowing' mode and can overwhelm downstream consumers

  • Forgetting that stream.pipeline() destroys all streams on error — don't add additional logic to destroyed streams after a pipeline failure

💼 Interview Questions

🎤 Mock Interview

Mock interview is powered by AI for Advanced Stream Patterns. Login to unlock this feature.