Advanced Stream Patterns
📖 Concept
Beyond basic piping, streams unlock powerful data processing patterns for production applications: chaining, multiplexing, throttling, and async iteration.
Stream chaining (pipeline): Chain multiple Transform streams to create data processing pipelines — like Unix pipes:
readStream → decompress → decrypt → parse → filter → format → writeStream
Object mode streams:
By default, streams work with Buffers/strings. Setting objectMode: true lets streams pass JavaScript objects — essential for data processing pipelines.
Async iteration (for await...of): Since Node.js 10, readable streams are async iterables:
for await (const chunk of readableStream) {
process.stdout.write(chunk);
}
Web Streams API (Node.js 16+):
Node.js now supports the Web Streams API (ReadableStream, WritableStream, TransformStream) — the same API used in browsers. This enables code sharing between Node.js and browsers.
When to use streams vs. buffers:
| Scenario | Use |
|---|---|
| File < 50MB | readFile (simpler) |
| File > 50MB | createReadStream (memory efficient) |
| Real-time data | Streams (WebSocket, SSE) |
| ETL pipelines | Transform streams |
| HTTP responses | Stream directly (pipe to response) |
| Log processing | Line-by-line stream reading |
🏠 Real-world analogy: Advanced streams are like an assembly line in a factory — each station (Transform) does one job, and items move automatically from station to station. You can add, remove, or rearrange stations without redesigning the whole factory.
💻 Code Example
1// Advanced Stream Patterns23const { Readable, Writable, Transform, pipeline } = require("stream");4const { promisify } = require("util");5const fs = require("fs");6const zlib = require("zlib");7const crypto = require("crypto");89const pipelineAsync = promisify(pipeline);1011// 1. Custom Readable stream — Generate data on demand12class NumberGenerator extends Readable {13 constructor(max) {14 super({ objectMode: true });15 this.current = 0;16 this.max = max;17 }1819 _read() {20 if (this.current >= this.max) {21 this.push(null); // Signal end of stream22 return;23 }24 // Simulate async data generation25 setTimeout(() => {26 this.push({ number: this.current++, timestamp: Date.now() });27 }, 10);28 }29}3031// 2. Custom Writable stream — Database batch inserter32class BatchInserter extends Writable {33 constructor(batchSize = 100) {34 super({ objectMode: true });35 this.batch = [];36 this.batchSize = batchSize;37 this.totalInserted = 0;38 }3940 async _write(record, encoding, callback) {41 this.batch.push(record);4243 if (this.batch.length >= this.batchSize) {44 await this.flushBatch();45 }46 callback();47 }4849 async _final(callback) {50 if (this.batch.length > 0) {51 await this.flushBatch();52 }53 console.log(`Total records inserted: ${this.totalInserted}`);54 callback();55 }5657 async flushBatch() {58 // Simulate database batch insert59 console.log(`Inserting batch of ${this.batch.length} records...`);60 // await db.collection("data").insertMany(this.batch);61 this.totalInserted += this.batch.length;62 this.batch = [];63 }64}6566// 3. Transform pipeline — ETL (Extract, Transform, Load)67class FilterTransform extends Transform {68 constructor(predicate) {69 super({ objectMode: true });70 this.predicate = predicate;71 }7273 _transform(obj, encoding, callback) {74 if (this.predicate(obj)) {75 this.push(obj);76 }77 callback();78 }79}8081class MapTransform extends Transform {82 constructor(mapper) {83 super({ objectMode: true });84 this.mapper = mapper;85 }8687 _transform(obj, encoding, callback) {88 this.push(this.mapper(obj));89 callback();90 }91}9293// Usage: Generate → Filter → Transform → Insert94async function runETLPipeline() {95 await pipelineAsync(96 new NumberGenerator(1000),97 new FilterTransform((item) => item.number % 2 === 0), // Even numbers98 new MapTransform((item) => ({99 ...item,100 squared: item.number ** 2,101 label: `Item #${item.number}`,102 })),103 new BatchInserter(50)104 );105 console.log("ETL pipeline complete");106}107108// 4. Async iteration with readable streams109async function processFileByLine(filePath) {110 const stream = fs.createReadStream(filePath, { encoding: "utf-8" });111 let lineBuffer = "";112 let lineCount = 0;113114 for await (const chunk of stream) {115 lineBuffer += chunk;116 const lines = lineBuffer.split("\n");117 lineBuffer = lines.pop();118119 for (const line of lines) {120 lineCount++;121 // Process each line122 if (line.includes("ERROR")) {123 console.log(`Line ${lineCount}: ${line}`);124 }125 }126 }127}128129// 5. Multi-step file processing pipeline130async function processLogFile(inputPath, outputPath) {131 await pipelineAsync(132 // Read compressed log file133 fs.createReadStream(inputPath),134 // Decompress135 zlib.createGunzip(),136 // Filter lines containing "ERROR"137 new Transform({138 transform(chunk, encoding, callback) {139 const lines = chunk.toString().split("\n");140 const errorLines = lines141 .filter((line) => line.includes("ERROR"))142 .join("\n");143 if (errorLines) this.push(errorLines + "\n");144 callback();145 },146 }),147 // Write filtered output148 fs.createWriteStream(outputPath)149 );150 console.log("Log processing complete");151}152153// 6. Readable.from() — Create streams from iterables154async function streamFromArray() {155 const data = [156 { name: "Alice", score: 95 },157 { name: "Bob", score: 87 },158 { name: "Charlie", score: 92 },159 ];160161 const stream = Readable.from(data.map((d) => JSON.stringify(d) + "\n"));162163 for await (const chunk of stream) {164 process.stdout.write(chunk);165 }166}167168// 7. Duplex stream — Echo server169const net = require("net");170171function createEchoServer(port) {172 const server = net.createServer((socket) => {173 // socket is a Duplex stream174 console.log("Client connected");175176 socket.on("data", (data) => {177 // Echo back what was received178 socket.write(`Echo: ${data}`);179 });180181 socket.on("end", () => console.log("Client disconnected"));182 socket.on("error", (err) => console.error("Socket error:", err.message));183 });184185 server.listen(port, () => console.log(`Echo server on port ${port}`));186 return server;187}188189// 8. Stream performance monitoring190class MonitoredTransform extends Transform {191 constructor(name) {192 super();193 this.name = name;194 this.bytesProcessed = 0;195 this.startTime = Date.now();196 }197198 _transform(chunk, encoding, callback) {199 this.bytesProcessed += chunk.length;200 this.push(chunk);201 callback();202 }203204 _flush(callback) {205 const elapsed = (Date.now() - this.startTime) / 1000;206 const mbProcessed = this.bytesProcessed / 1024 / 1024;207 console.log(208 `[${this.name}] Processed ${mbProcessed.toFixed(2)}MB in ${elapsed.toFixed(2)}s (${(mbProcessed / elapsed).toFixed(2)} MB/s)`209 );210 callback();211 }212}
🏋️ Practice Exercise
Exercises:
- Build an ETL pipeline with custom Readable, Transform, and Writable streams that processes a CSV and inserts into a database
- Create a streaming JSON parser that handles newline-delimited JSON (NDJSON) files
- Implement a rate-limiting Transform stream that limits throughput to N bytes per second
- Build a stream multiplexer that sends a readable stream to multiple writable destinations
- Use
Readable.from()and async generators to create a stream from paginated API responses - Write a performance-monitoring Transform stream that logs throughput (MB/s) and chunk counts
⚠️ Common Mistakes
Not using
objectMode: truewhen passing JavaScript objects through streams — without it, streams expect Buffer/string and will throw or corrupt dataCreating custom streams without implementing
_final()— this method is called before the stream closes; use it to flush remaining buffered dataNot destroying streams on error — use
stream.destroy(err)to clean up resources;pipeline()does this automaticallyUsing
readableStream.on('data')without handling backpressure — this puts the stream in 'flowing' mode and can overwhelm downstream consumersForgetting that
stream.pipeline()destroys all streams on error — don't add additional logic to destroyed streams after a pipeline failure
💼 Interview Questions
🎤 Mock Interview
Mock interview is powered by AI for Advanced Stream Patterns. Login to unlock this feature.