Streams — Fundamentals
📖 Concept
Streams are one of Node.js's most powerful features — they allow you to process data piece by piece without loading the entire thing into memory. This is critical for large files, network data, and real-time processing.
Why streams matter: Without streams, reading a 2GB file requires 2GB of RAM. With streams, you process it in ~64KB chunks, using almost no memory.
Four types of streams:
| Type | Description | Examples |
|---|---|---|
| Readable | Source of data | fs.createReadStream, http.IncomingMessage, process.stdin |
| Writable | Destination for data | fs.createWriteStream, http.ServerResponse, process.stdout |
| Duplex | Both readable and writable | net.Socket, crypto.Cipher |
| Transform | Modify data as it passes through | zlib.createGzip, crypto.createHash |
Readable stream events:
| Event | Description |
|---|---|
data |
A chunk of data is available |
end |
No more data to read |
error |
An error occurred |
close |
Stream and resources released |
Writable stream events:
| Event | Description |
|---|---|
drain |
Buffer is empty, safe to write more |
finish |
All data has been flushed |
error |
An error occurred |
close |
Stream and resources released |
Backpressure:
When a writable stream can't keep up with the readable stream, data accumulates in an internal buffer. The pipe() method handles backpressure automatically — it pauses the readable when the writable is full and resumes when it drains.
🏠 Real-world analogy: Streams are like a water pipeline. Water (data) flows from the source (readable) through pipes (transform) to the destination (writable). You don't need a swimming pool (memory) to move water from one place to another.
💻 Code Example
1// Node.js Streams — Fundamentals23const fs = require("fs");4const path = require("path");5const { Transform, pipeline } = require("stream");6const { promisify } = require("util");7const zlib = require("zlib");89const pipelineAsync = promisify(pipeline);1011// 1. Reading a large file with streams (memory efficient)12function readLargeFile(filePath) {13 const stream = fs.createReadStream(filePath, {14 encoding: "utf-8",15 highWaterMark: 64 * 1024, // 64KB chunks (default)16 });1718 let lineCount = 0;19 let buffer = "";2021 stream.on("data", (chunk) => {22 buffer += chunk;23 const lines = buffer.split("\n");24 buffer = lines.pop(); // Keep incomplete last line25 lineCount += lines.length;26 });2728 stream.on("end", () => {29 if (buffer) lineCount++; // Count last line30 console.log(`Total lines: ${lineCount}`);31 });3233 stream.on("error", (err) => {34 console.error("Stream error:", err.message);35 });36}3738// 2. Writing with streams39function writeDataStream(filePath, data) {40 const stream = fs.createWriteStream(filePath);4142 for (let i = 0; i < data.length; i++) {43 const canContinue = stream.write(JSON.stringify(data[i]) + "\n");4445 if (!canContinue) {46 // Buffer is full — wait for drain47 // In practice, use pipeline() to handle this automatically48 console.log("Backpressure detected at item", i);49 }50 }5152 stream.end(); // Signal that writing is complete53 stream.on("finish", () => console.log("Write complete"));54}5556// 3. pipe() — Connect readable → writable57function copyFile(src, dest) {58 const readStream = fs.createReadStream(src);59 const writeStream = fs.createWriteStream(dest);6061 readStream.pipe(writeStream);6263 writeStream.on("finish", () => console.log("Copy complete"));64 readStream.on("error", (err) => console.error("Read error:", err));65 writeStream.on("error", (err) => console.error("Write error:", err));66}6768// 4. pipeline() — Better pipe with error handling (recommended)69async function compressFile(inputPath, outputPath) {70 try {71 await pipelineAsync(72 fs.createReadStream(inputPath),73 zlib.createGzip(),74 fs.createWriteStream(outputPath)75 );76 console.log("Compression complete");77 } catch (err) {78 console.error("Pipeline failed:", err.message);79 }80}8182// 5. Custom Transform stream — CSV to JSON83class CSVToJSON extends Transform {84 constructor(options) {85 super({ ...options, objectMode: true });86 this.headers = null;87 this.buffer = "";88 }8990 _transform(chunk, encoding, callback) {91 this.buffer += chunk.toString();92 const lines = this.buffer.split("\n");93 this.buffer = lines.pop(); // Keep incomplete line9495 for (const line of lines) {96 if (!line.trim()) continue;97 const values = line.split(",").map((v) => v.trim());9899 if (!this.headers) {100 this.headers = values;101 continue;102 }103104 const obj = {};105 this.headers.forEach((header, i) => {106 obj[header] = values[i] || "";107 });108 this.push(JSON.stringify(obj) + "\n");109 }110 callback();111 }112113 _flush(callback) {114 // Process any remaining data115 if (this.buffer.trim() && this.headers) {116 const values = this.buffer.split(",").map((v) => v.trim());117 const obj = {};118 this.headers.forEach((header, i) => {119 obj[header] = values[i] || "";120 });121 this.push(JSON.stringify(obj) + "\n");122 }123 callback();124 }125}126127// Usage: Convert CSV file to JSON128async function convertCSVtoJSON(csvPath, jsonPath) {129 await pipelineAsync(130 fs.createReadStream(csvPath),131 new CSVToJSON(),132 fs.createWriteStream(jsonPath)133 );134 console.log("CSV → JSON conversion complete");135}136137// 6. Stream-based file copy with progress138function copyWithProgress(src, dest) {139 return new Promise((resolve, reject) => {140 const stat = fs.statSync(src);141 let copied = 0;142143 const readStream = fs.createReadStream(src);144 const writeStream = fs.createWriteStream(dest);145146 readStream.on("data", (chunk) => {147 copied += chunk.length;148 const percent = ((copied / stat.size) * 100).toFixed(1);149 process.stdout.write(`\rCopying: ${percent}%`);150 });151152 readStream.pipe(writeStream);153 writeStream.on("finish", () => {154 console.log("\nCopy complete!");155 resolve();156 });157 readStream.on("error", reject);158 writeStream.on("error", reject);159 });160}161162// 7. Memory comparison: Buffer vs Stream163async function memoryComparison() {164 // ❌ Buffer approach — loads entire file into memory165 // const data = await fs.promises.readFile("huge-file.csv", "utf-8");166 // const lines = data.split("\n"); // DOUBLE memory (original + split array)167168 // ✅ Stream approach — processes line by line169 // const stream = fs.createReadStream("huge-file.csv", "utf-8");170 // Uses ~64KB regardless of file size171}
🏋️ Practice Exercise
Exercises:
- Create a stream-based file copy function with progress reporting (percentage complete)
- Build a Transform stream that converts uppercase text to titled case
- Use
pipeline()to read a file, compress it with gzip, and write the compressed output - Create a line-counting stream that counts lines without loading the entire file into memory
- Implement a CSV parser as a Transform stream that emits parsed row objects
- Build a stream that filters JSON lines — only passing through objects matching a condition
⚠️ Common Mistakes
Using
fs.readFile()for large files — this loads the entire file into memory; usecreateReadStream()instead for files over ~50MBIgnoring backpressure — writing to a stream faster than it can drain causes memory bloat; use
pipeline()or check.write()return valueUsing
pipe()without error handling — errors in piped streams are not forwarded; usepipeline()which handles errors and cleanup automaticallyForgetting to call
stream.end()on writable streams — without.end(), the'finish'event never fires and resources aren't releasedNot handling the 'error' event on streams — unhandled stream errors crash the process; always listen for errors on every stream
💼 Interview Questions
🎤 Mock Interview
Mock interview is powered by AI for Streams — Fundamentals. Login to unlock this feature.