Streams and Buffers
Streams and Buffers
Streams are one of Node.js's most powerful features. They allow you to process data piece by piece, without loading everything into memory. Understanding streams is essential for building efficient, scalable applications.
Why Streams Matter
Consider reading a 1GB file:
// BAD: Loads entire file into memory
const data = fs.readFileSync("huge-file.txt");
// Memory: 1GB used
// GOOD: Processes in chunks
const stream = fs.createReadStream("huge-file.txt");
stream.on("data", (chunk) => {
// Process 64KB at a time
// Memory: ~64KB used
});Streams enable:
- Memory efficiency: Process data in chunks
- Time efficiency: Start processing before all data arrives
- Composability: Pipe streams together like Unix pipes
Buffer: The Building Block
Buffers hold binary data. Streams transfer data as Buffers.
Creating Buffers
// From string
const buf1 = Buffer.from("Hello, World!");
// Allocate empty buffer
const buf2 = Buffer.alloc(10); // Initialized to zeros
const buf3 = Buffer.allocUnsafe(10); // Faster, but may contain old data
// From array
const buf4 = Buffer.from([0x48, 0x65, 0x6c, 0x6c, 0x6f]);Buffer Operations
const buf = Buffer.from("Hello, World!");
// Reading
console.log(buf.toString()); // 'Hello, World!'
console.log(buf.toString("utf8", 0, 5)); // 'Hello'
console.log(buf[0]); // 72 (ASCII for 'H')
console.log(buf.length); // 13
// Writing
buf.write("Hi"); // Writes at position 0
buf[0] = 0x4a; // Change 'H' to 'J'
// Slicing (shares memory!)
const slice = buf.slice(0, 5); // References same memory
slice[0] = 0x58; // Also modifies buf!
// Copying (new memory)
const copy = Buffer.from(buf); // Independent copy
// Concatenating
const combined = Buffer.concat([buf1, buf2, buf3]);Warning
Buffer.allocUnsafe() is faster but dangerous—it may contain sensitive data
from previous allocations. Always use Buffer.alloc() unless you immediately
overwrite all bytes.
Stream Types
Node.js has four fundamental stream types:
| Type | Description | Example |
|---|---|---|
| Readable | Source of data | fs.createReadStream, HTTP response |
| Writable | Destination for data | fs.createWriteStream, HTTP request |
| Duplex | Both readable and writable | TCP socket |
| Transform | Duplex that modifies data | zlib.createGzip() |
Readable Streams
Streams that produce data.
Consuming Readable Streams
const fs = require("fs");
const readable = fs.createReadStream("file.txt");
// Event-based (flowing mode)
readable.on("data", (chunk) => {
console.log("Received:", chunk.length, "bytes");
});
readable.on("end", () => {
console.log("No more data");
});
readable.on("error", (err) => {
console.error("Error:", err);
});
// Manual reading (paused mode)
readable.on("readable", () => {
let chunk;
while ((chunk = readable.read()) !== null) {
console.log("Read:", chunk.length, "bytes");
}
});Flowing vs Paused Mode
PAUSED MODE (default)
┌──────────────┐
│ Readable │ ──── .read() ────► Data
│ Stream │ (manual pull)
└──────────────┘
FLOWING MODE (activated by 'data' listener or .pipe())
┌──────────────┐
│ Readable │ ════════════════► Data
│ Stream │ (automatic push)
└──────────────┘Creating Readable Streams
const { Readable } = require("stream");
// Method 1: Push data
const readable = new Readable({
read(size) {
this.push("Hello ");
this.push("World!");
this.push(null); // Signal end
},
});
// Method 2: From iterable
const readable = Readable.from(["Hello ", "World!"]);
// Method 3: Async generator
async function* generate() {
yield "Hello ";
yield "World!";
}
const readable = Readable.from(generate());Custom Readable: Random Number Generator
const { Readable } = require("stream");
class RandomStream extends Readable {
constructor(options = {}) {
super(options);
this.count = options.count || 10;
this.emitted = 0;
}
_read() {
if (this.emitted >= this.count) {
this.push(null); // End stream
return;
}
const random = Math.random().toString() + "\n";
this.push(random);
this.emitted++;
}
}
const randomStream = new RandomStream({ count: 5 });
randomStream.pipe(process.stdout);Writable Streams
Streams that consume data.
Writing to Streams
const fs = require("fs");
const writable = fs.createWriteStream("output.txt");
// Write data
writable.write("Hello ");
writable.write("World!");
writable.end("\nGoodbye!"); // Final write + close
// Events
writable.on("finish", () => console.log("All data written"));
writable.on("error", (err) => console.error("Error:", err));The write() Return Value
write() returns false when the internal buffer is full—this is the backpressure signal:
const writable = fs.createWriteStream("output.txt");
for (let i = 0; i < 1000000; i++) {
const canContinue = writable.write(`Line ${i}\n`);
if (!canContinue) {
// Buffer is full! Wait for drain
await new Promise((resolve) => writable.once("drain", resolve));
}
}
writable.end();Creating Writable Streams
const { Writable } = require("stream");
const writable = new Writable({
write(chunk, encoding, callback) {
console.log("Received:", chunk.toString());
callback(); // Call when done processing
},
});
writable.write("Hello");
writable.write("World");
writable.end();Custom Writable: HTTP Uploader
const { Writable } = require("stream");
const https = require("https");
class UploadStream extends Writable {
constructor(url, options = {}) {
super(options);
this.url = url;
this.chunks = [];
}
_write(chunk, encoding, callback) {
this.chunks.push(chunk);
callback();
}
_final(callback) {
const data = Buffer.concat(this.chunks);
const req = https.request(
this.url,
{
method: "POST",
headers: { "Content-Length": data.length },
},
(res) => {
if (res.statusCode === 200) {
callback();
} else {
callback(new Error(`Upload failed: ${res.statusCode}`));
}
},
);
req.write(data);
req.end();
}
}Transform Streams
Transform streams modify data as it passes through.
const { Transform } = require("stream");
const upperCase = new Transform({
transform(chunk, encoding, callback) {
const upper = chunk.toString().toUpperCase();
this.push(upper);
callback();
},
});
process.stdin.pipe(upperCase).pipe(process.stdout);Custom Transform: JSON Parser
const { Transform } = require("stream");
class JSONParser extends Transform {
constructor() {
super({ objectMode: true }); // Output objects, not buffers
this.buffer = "";
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
// Find complete JSON objects (newline-delimited)
const lines = this.buffer.split("\n");
this.buffer = lines.pop(); // Keep incomplete line
for (const line of lines) {
if (line.trim()) {
try {
const obj = JSON.parse(line);
this.push(obj);
} catch (err) {
this.emit("error", err);
}
}
}
callback();
}
_flush(callback) {
// Process remaining buffer
if (this.buffer.trim()) {
try {
this.push(JSON.parse(this.buffer));
} catch (err) {
this.emit("error", err);
}
}
callback();
}
}
// Usage
fs.createReadStream("data.ndjson")
.pipe(new JSONParser())
.on("data", (obj) => console.log(obj));Piping Streams
The pipe() method connects streams:
// Basic pipe
readable.pipe(writable);
// Chain multiple transforms
fs.createReadStream("input.txt")
.pipe(zlib.createGzip())
.pipe(crypto.createCipher("aes192", "secret"))
.pipe(fs.createWriteStream("output.gz.enc"));
// pipe() returns the destination, enabling chainingError Handling with Pipes
pipe() doesn't propagate errors! Each stream needs its own error handler:
// BAD: Only catches writable errors
readable.pipe(transform).pipe(writable).on("error", handleError); // Misses readable/transform errors!
// GOOD: Handle all errors
readable.on("error", handleError);
transform.on("error", handleError);
writable.on("error", handleError);
readable.pipe(transform).pipe(writable);pipeline(): The Modern Alternative
Use pipeline() for proper error handling and cleanup:
const { pipeline } = require("stream/promises");
await pipeline(
fs.createReadStream("input.txt"),
zlib.createGzip(),
fs.createWriteStream("output.gz"),
);
// Automatically handles errors and cleanup
// With callback (older API)
const { pipeline } = require("stream");
pipeline(readable, transform, writable, (err) => {
if (err) console.error("Pipeline failed:", err);
else console.log("Pipeline succeeded");
});Backpressure
Backpressure prevents fast producers from overwhelming slow consumers.
The Problem
┌─────────────┐ 1000 MB/s ┌─────────────┐ 10 MB/s ┌─────────────┐
│ Fast │ ────────────► │ Transform │ ──────────► │ Slow │
│ Source │ │ (buffer) │ │ Sink │
└─────────────┘ └─────────────┘ └─────────────┘
│
Memory grows!How pipe() Handles It
pipe() automatically manages backpressure:
- When writable buffer is full,
write()returnsfalse pipe()pauses the readable stream- When writable drains, it emits
'drain' pipe()resumes the readable stream
Manual Backpressure
When not using pipe():
const readable = fs.createReadStream("large-file.txt");
const writable = fs.createWriteStream("output.txt");
readable.on("data", (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
readable.pause();
writable.once("drain", () => readable.resume());
}
});
readable.on("end", () => writable.end());Warning
Always handle backpressure in production code. Ignoring it leads to memory exhaustion and crashes under load.
Object Mode
By default, streams work with Buffers/strings. Object mode allows any JavaScript value:
const { Transform } = require("stream");
const parseJSON = new Transform({
objectMode: true, // Enable object mode
transform(line, encoding, callback) {
try {
const obj = JSON.parse(line);
this.push(obj); // Push object, not buffer
callback();
} catch (err) {
callback(err);
}
},
});
// Usage
parseJSON.write('{"name": "Alice"}');
parseJSON.on("data", (obj) => {
console.log(obj.name); // 'Alice'
});Mixed Mode Streams
You can have different modes for read and write sides:
const { Transform } = require("stream");
const objectToBuffer = new Transform({
writableObjectMode: true, // Accept objects
readableObjectMode: false, // Output buffers
transform(obj, encoding, callback) {
this.push(JSON.stringify(obj) + "\n");
callback();
},
});Practical Patterns
Processing Large Files
const { createReadStream } = require("fs");
const { createInterface } = require("readline");
async function processLargeFile(filepath) {
const stream = createReadStream(filepath);
const rl = createInterface({ input: stream });
let lineNumber = 0;
for await (const line of rl) {
lineNumber++;
// Process each line without loading entire file
await processLine(line);
}
console.log(`Processed ${lineNumber} lines`);
}HTTP Streaming
const http = require("http");
const fs = require("fs");
http
.createServer((req, res) => {
// Stream file directly to response
const stream = fs.createReadStream("video.mp4");
res.setHeader("Content-Type", "video/mp4");
stream.pipe(res);
stream.on("error", (err) => {
res.statusCode = 500;
res.end("Error streaming file");
});
})
.listen(3000);Compression Pipeline
const { pipeline } = require("stream/promises");
const { createReadStream, createWriteStream } = require("fs");
const { createGzip, createGunzip } = require("zlib");
// Compress
await pipeline(
createReadStream("data.json"),
createGzip(),
createWriteStream("data.json.gz"),
);
// Decompress
await pipeline(
createReadStream("data.json.gz"),
createGunzip(),
createWriteStream("data-restored.json"),
);Progress Tracking
const { Transform } = require("stream");
function createProgressStream(total) {
let transferred = 0;
return new Transform({
transform(chunk, encoding, callback) {
transferred += chunk.length;
const percent = ((transferred / total) * 100).toFixed(1);
process.stdout.write(`\rProgress: ${percent}%`);
callback(null, chunk);
},
});
}
// Usage
const { size } = fs.statSync("large-file.zip");
await pipeline(
fs.createReadStream("large-file.zip"),
createProgressStream(size),
fs.createWriteStream("copy.zip"),
);Summary
Streams are essential for efficient data handling:
| Stream Type | Purpose | Examples |
|---|---|---|
| Readable | Produce data | File read, HTTP response |
| Writable | Consume data | File write, HTTP request |
| Transform | Modify data | Compression, encryption |
| Duplex | Both directions | TCP socket |
Key concepts:
- Buffers hold binary data—the currency of streams
- Backpressure prevents memory exhaustion
- pipeline() handles errors and cleanup properly
- Object mode allows streaming JavaScript objects
- Flowing vs paused controls data flow mode
Best practices:
- Use
pipeline()instead ofpipe()for error handling - Always handle backpressure in custom implementations
- Prefer streams for large data or network operations
- Use object mode for structured data processing
Note
Understanding streams is what separates Node.js beginners from experts. They're used throughout the ecosystem—HTTP, file system, databases, compression, and more.