Streams — Fundamentals

📖 Concept

Streams are one of Node.js's most powerful features — they allow you to process data piece by piece without loading the entire thing into memory. This is critical for large files, network data, and real-time processing.

Why streams matter: Without streams, reading a 2GB file requires 2GB of RAM. With streams, you process it in ~64KB chunks, using almost no memory.

Four types of streams:

Type Description Examples
Readable Source of data fs.createReadStream, http.IncomingMessage, process.stdin
Writable Destination for data fs.createWriteStream, http.ServerResponse, process.stdout
Duplex Both readable and writable net.Socket, crypto.Cipher
Transform Modify data as it passes through zlib.createGzip, crypto.createHash

Readable stream events:

Event Description
data A chunk of data is available
end No more data to read
error An error occurred
close Stream and resources released

Writable stream events:

Event Description
drain Buffer is empty, safe to write more
finish All data has been flushed
error An error occurred
close Stream and resources released

Backpressure: When a writable stream can't keep up with the readable stream, data accumulates in an internal buffer. The pipe() method handles backpressure automatically — it pauses the readable when the writable is full and resumes when it drains.

🏠 Real-world analogy: Streams are like a water pipeline. Water (data) flows from the source (readable) through pipes (transform) to the destination (writable). You don't need a swimming pool (memory) to move water from one place to another.

💻 Code Example

codeTap to expand ⛶
1// Node.js Streams — Fundamentals
2
3const fs = require("fs");
4const path = require("path");
5const { Transform, pipeline } = require("stream");
6const { promisify } = require("util");
7const zlib = require("zlib");
8
9const pipelineAsync = promisify(pipeline);
10
11// 1. Reading a large file with streams (memory efficient)
12function readLargeFile(filePath) {
13 const stream = fs.createReadStream(filePath, {
14 encoding: "utf-8",
15 highWaterMark: 64 * 1024, // 64KB chunks (default)
16 });
17
18 let lineCount = 0;
19 let buffer = "";
20
21 stream.on("data", (chunk) => {
22 buffer += chunk;
23 const lines = buffer.split("\n");
24 buffer = lines.pop(); // Keep incomplete last line
25 lineCount += lines.length;
26 });
27
28 stream.on("end", () => {
29 if (buffer) lineCount++; // Count last line
30 console.log(`Total lines: ${lineCount}`);
31 });
32
33 stream.on("error", (err) => {
34 console.error("Stream error:", err.message);
35 });
36}
37
38// 2. Writing with streams
39function writeDataStream(filePath, data) {
40 const stream = fs.createWriteStream(filePath);
41
42 for (let i = 0; i < data.length; i++) {
43 const canContinue = stream.write(JSON.stringify(data[i]) + "\n");
44
45 if (!canContinue) {
46 // Buffer is full — wait for drain
47 // In practice, use pipeline() to handle this automatically
48 console.log("Backpressure detected at item", i);
49 }
50 }
51
52 stream.end(); // Signal that writing is complete
53 stream.on("finish", () => console.log("Write complete"));
54}
55
56// 3. pipe() — Connect readable → writable
57function copyFile(src, dest) {
58 const readStream = fs.createReadStream(src);
59 const writeStream = fs.createWriteStream(dest);
60
61 readStream.pipe(writeStream);
62
63 writeStream.on("finish", () => console.log("Copy complete"));
64 readStream.on("error", (err) => console.error("Read error:", err));
65 writeStream.on("error", (err) => console.error("Write error:", err));
66}
67
68// 4. pipeline() — Better pipe with error handling (recommended)
69async function compressFile(inputPath, outputPath) {
70 try {
71 await pipelineAsync(
72 fs.createReadStream(inputPath),
73 zlib.createGzip(),
74 fs.createWriteStream(outputPath)
75 );
76 console.log("Compression complete");
77 } catch (err) {
78 console.error("Pipeline failed:", err.message);
79 }
80}
81
82// 5. Custom Transform stream — CSV to JSON
83class CSVToJSON extends Transform {
84 constructor(options) {
85 super({ ...options, objectMode: true });
86 this.headers = null;
87 this.buffer = "";
88 }
89
90 _transform(chunk, encoding, callback) {
91 this.buffer += chunk.toString();
92 const lines = this.buffer.split("\n");
93 this.buffer = lines.pop(); // Keep incomplete line
94
95 for (const line of lines) {
96 if (!line.trim()) continue;
97 const values = line.split(",").map((v) => v.trim());
98
99 if (!this.headers) {
100 this.headers = values;
101 continue;
102 }
103
104 const obj = {};
105 this.headers.forEach((header, i) => {
106 obj[header] = values[i] || "";
107 });
108 this.push(JSON.stringify(obj) + "\n");
109 }
110 callback();
111 }
112
113 _flush(callback) {
114 // Process any remaining data
115 if (this.buffer.trim() && this.headers) {
116 const values = this.buffer.split(",").map((v) => v.trim());
117 const obj = {};
118 this.headers.forEach((header, i) => {
119 obj[header] = values[i] || "";
120 });
121 this.push(JSON.stringify(obj) + "\n");
122 }
123 callback();
124 }
125}
126
127// Usage: Convert CSV file to JSON
128async function convertCSVtoJSON(csvPath, jsonPath) {
129 await pipelineAsync(
130 fs.createReadStream(csvPath),
131 new CSVToJSON(),
132 fs.createWriteStream(jsonPath)
133 );
134 console.log("CSV → JSON conversion complete");
135}
136
137// 6. Stream-based file copy with progress
138function copyWithProgress(src, dest) {
139 return new Promise((resolve, reject) => {
140 const stat = fs.statSync(src);
141 let copied = 0;
142
143 const readStream = fs.createReadStream(src);
144 const writeStream = fs.createWriteStream(dest);
145
146 readStream.on("data", (chunk) => {
147 copied += chunk.length;
148 const percent = ((copied / stat.size) * 100).toFixed(1);
149 process.stdout.write(`\rCopying: ${percent}%`);
150 });
151
152 readStream.pipe(writeStream);
153 writeStream.on("finish", () => {
154 console.log("\nCopy complete!");
155 resolve();
156 });
157 readStream.on("error", reject);
158 writeStream.on("error", reject);
159 });
160}
161
162// 7. Memory comparison: Buffer vs Stream
163async function memoryComparison() {
164 // ❌ Buffer approach — loads entire file into memory
165 // const data = await fs.promises.readFile("huge-file.csv", "utf-8");
166 // const lines = data.split("\n"); // DOUBLE memory (original + split array)
167
168 // ✅ Stream approach — processes line by line
169 // const stream = fs.createReadStream("huge-file.csv", "utf-8");
170 // Uses ~64KB regardless of file size
171}

🏋️ Practice Exercise

Exercises:

  1. Create a stream-based file copy function with progress reporting (percentage complete)
  2. Build a Transform stream that converts uppercase text to titled case
  3. Use pipeline() to read a file, compress it with gzip, and write the compressed output
  4. Create a line-counting stream that counts lines without loading the entire file into memory
  5. Implement a CSV parser as a Transform stream that emits parsed row objects
  6. Build a stream that filters JSON lines — only passing through objects matching a condition

⚠️ Common Mistakes

  • Using fs.readFile() for large files — this loads the entire file into memory; use createReadStream() instead for files over ~50MB

  • Ignoring backpressure — writing to a stream faster than it can drain causes memory bloat; use pipeline() or check .write() return value

  • Using pipe() without error handling — errors in piped streams are not forwarded; use pipeline() which handles errors and cleanup automatically

  • Forgetting to call stream.end() on writable streams — without .end(), the 'finish' event never fires and resources aren't released

  • Not handling the 'error' event on streams — unhandled stream errors crash the process; always listen for errors on every stream

💼 Interview Questions

🎤 Mock Interview

Mock interview is powered by AI for Streams — Fundamentals. Login to unlock this feature.