Learning Guides
Menu

Streams and Buffers

9 min readNode.js Design Patterns

Streams and Buffers

Streams are one of Node.js's most powerful features. They allow you to process data piece by piece, without loading everything into memory. Understanding streams is essential for building efficient, scalable applications.

Why Streams Matter

Consider reading a 1GB file:

JAVASCRIPT
// BAD: Loads entire file into memory
const data = fs.readFileSync("huge-file.txt");
// Memory: 1GB used
 
// GOOD: Processes in chunks
const stream = fs.createReadStream("huge-file.txt");
stream.on("data", (chunk) => {
  // Process 64KB at a time
  // Memory: ~64KB used
});

Streams enable:

  • Memory efficiency: Process data in chunks
  • Time efficiency: Start processing before all data arrives
  • Composability: Pipe streams together like Unix pipes

Buffer: The Building Block

Buffers hold binary data. Streams transfer data as Buffers.

Creating Buffers

JAVASCRIPT
// From string
const buf1 = Buffer.from("Hello, World!");
 
// Allocate empty buffer
const buf2 = Buffer.alloc(10); // Initialized to zeros
const buf3 = Buffer.allocUnsafe(10); // Faster, but may contain old data
 
// From array
const buf4 = Buffer.from([0x48, 0x65, 0x6c, 0x6c, 0x6f]);

Buffer Operations

JAVASCRIPT
const buf = Buffer.from("Hello, World!");
 
// Reading
console.log(buf.toString()); // 'Hello, World!'
console.log(buf.toString("utf8", 0, 5)); // 'Hello'
console.log(buf[0]); // 72 (ASCII for 'H')
console.log(buf.length); // 13
 
// Writing
buf.write("Hi"); // Writes at position 0
buf[0] = 0x4a; // Change 'H' to 'J'
 
// Slicing (shares memory!)
const slice = buf.slice(0, 5); // References same memory
slice[0] = 0x58; // Also modifies buf!
 
// Copying (new memory)
const copy = Buffer.from(buf); // Independent copy
 
// Concatenating
const combined = Buffer.concat([buf1, buf2, buf3]);

Warning

Buffer.allocUnsafe() is faster but dangerous—it may contain sensitive data from previous allocations. Always use Buffer.alloc() unless you immediately overwrite all bytes.


Stream Types

Node.js has four fundamental stream types:

TypeDescriptionExample
ReadableSource of datafs.createReadStream, HTTP response
WritableDestination for datafs.createWriteStream, HTTP request
DuplexBoth readable and writableTCP socket
TransformDuplex that modifies datazlib.createGzip()

Readable Streams

Streams that produce data.

Consuming Readable Streams

JAVASCRIPT
const fs = require("fs");
const readable = fs.createReadStream("file.txt");
 
// Event-based (flowing mode)
readable.on("data", (chunk) => {
  console.log("Received:", chunk.length, "bytes");
});
readable.on("end", () => {
  console.log("No more data");
});
readable.on("error", (err) => {
  console.error("Error:", err);
});
 
// Manual reading (paused mode)
readable.on("readable", () => {
  let chunk;
  while ((chunk = readable.read()) !== null) {
    console.log("Read:", chunk.length, "bytes");
  }
});

Flowing vs Paused Mode

PLAINTEXT
PAUSED MODE (default)
  ┌──────────────┐
  │   Readable   │ ──── .read() ────► Data
  │   Stream     │                   (manual pull)
  └──────────────┘
 
FLOWING MODE (activated by 'data' listener or .pipe())
  ┌──────────────┐
  │   Readable   │ ════════════════► Data
  │   Stream     │                   (automatic push)
  └──────────────┘

Creating Readable Streams

JAVASCRIPT
const { Readable } = require("stream");
 
// Method 1: Push data
const readable = new Readable({
  read(size) {
    this.push("Hello ");
    this.push("World!");
    this.push(null); // Signal end
  },
});
 
// Method 2: From iterable
const readable = Readable.from(["Hello ", "World!"]);
 
// Method 3: Async generator
async function* generate() {
  yield "Hello ";
  yield "World!";
}
const readable = Readable.from(generate());

Custom Readable: Random Number Generator

JAVASCRIPT
const { Readable } = require("stream");
 
class RandomStream extends Readable {
  constructor(options = {}) {
    super(options);
    this.count = options.count || 10;
    this.emitted = 0;
  }
 
  _read() {
    if (this.emitted >= this.count) {
      this.push(null); // End stream
      return;
    }
 
    const random = Math.random().toString() + "\n";
    this.push(random);
    this.emitted++;
  }
}
 
const randomStream = new RandomStream({ count: 5 });
randomStream.pipe(process.stdout);

Writable Streams

Streams that consume data.

Writing to Streams

JAVASCRIPT
const fs = require("fs");
const writable = fs.createWriteStream("output.txt");
 
// Write data
writable.write("Hello ");
writable.write("World!");
writable.end("\nGoodbye!"); // Final write + close
 
// Events
writable.on("finish", () => console.log("All data written"));
writable.on("error", (err) => console.error("Error:", err));

The write() Return Value

write() returns false when the internal buffer is full—this is the backpressure signal:

JAVASCRIPT
const writable = fs.createWriteStream("output.txt");
 
for (let i = 0; i < 1000000; i++) {
  const canContinue = writable.write(`Line ${i}\n`);
 
  if (!canContinue) {
    // Buffer is full! Wait for drain
    await new Promise((resolve) => writable.once("drain", resolve));
  }
}
 
writable.end();

Creating Writable Streams

JAVASCRIPT
const { Writable } = require("stream");
 
const writable = new Writable({
  write(chunk, encoding, callback) {
    console.log("Received:", chunk.toString());
    callback(); // Call when done processing
  },
});
 
writable.write("Hello");
writable.write("World");
writable.end();

Custom Writable: HTTP Uploader

JAVASCRIPT
const { Writable } = require("stream");
const https = require("https");
 
class UploadStream extends Writable {
  constructor(url, options = {}) {
    super(options);
    this.url = url;
    this.chunks = [];
  }
 
  _write(chunk, encoding, callback) {
    this.chunks.push(chunk);
    callback();
  }
 
  _final(callback) {
    const data = Buffer.concat(this.chunks);
 
    const req = https.request(
      this.url,
      {
        method: "POST",
        headers: { "Content-Length": data.length },
      },
      (res) => {
        if (res.statusCode === 200) {
          callback();
        } else {
          callback(new Error(`Upload failed: ${res.statusCode}`));
        }
      },
    );
 
    req.write(data);
    req.end();
  }
}

Transform Streams

Transform streams modify data as it passes through.

JAVASCRIPT
const { Transform } = require("stream");
 
const upperCase = new Transform({
  transform(chunk, encoding, callback) {
    const upper = chunk.toString().toUpperCase();
    this.push(upper);
    callback();
  },
});
 
process.stdin.pipe(upperCase).pipe(process.stdout);

Custom Transform: JSON Parser

JAVASCRIPT
const { Transform } = require("stream");
 
class JSONParser extends Transform {
  constructor() {
    super({ objectMode: true }); // Output objects, not buffers
    this.buffer = "";
  }
 
  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
 
    // Find complete JSON objects (newline-delimited)
    const lines = this.buffer.split("\n");
    this.buffer = lines.pop(); // Keep incomplete line
 
    for (const line of lines) {
      if (line.trim()) {
        try {
          const obj = JSON.parse(line);
          this.push(obj);
        } catch (err) {
          this.emit("error", err);
        }
      }
    }
 
    callback();
  }
 
  _flush(callback) {
    // Process remaining buffer
    if (this.buffer.trim()) {
      try {
        this.push(JSON.parse(this.buffer));
      } catch (err) {
        this.emit("error", err);
      }
    }
    callback();
  }
}
 
// Usage
fs.createReadStream("data.ndjson")
  .pipe(new JSONParser())
  .on("data", (obj) => console.log(obj));

Piping Streams

The pipe() method connects streams:

JAVASCRIPT
// Basic pipe
readable.pipe(writable);
 
// Chain multiple transforms
fs.createReadStream("input.txt")
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher("aes192", "secret"))
  .pipe(fs.createWriteStream("output.gz.enc"));
 
// pipe() returns the destination, enabling chaining

Error Handling with Pipes

pipe() doesn't propagate errors! Each stream needs its own error handler:

JAVASCRIPT
// BAD: Only catches writable errors
readable.pipe(transform).pipe(writable).on("error", handleError); // Misses readable/transform errors!
 
// GOOD: Handle all errors
readable.on("error", handleError);
transform.on("error", handleError);
writable.on("error", handleError);
 
readable.pipe(transform).pipe(writable);

pipeline(): The Modern Alternative

Use pipeline() for proper error handling and cleanup:

JAVASCRIPT
const { pipeline } = require("stream/promises");
 
await pipeline(
  fs.createReadStream("input.txt"),
  zlib.createGzip(),
  fs.createWriteStream("output.gz"),
);
// Automatically handles errors and cleanup
 
// With callback (older API)
const { pipeline } = require("stream");
pipeline(readable, transform, writable, (err) => {
  if (err) console.error("Pipeline failed:", err);
  else console.log("Pipeline succeeded");
});

Backpressure

Backpressure prevents fast producers from overwhelming slow consumers.

The Problem

PLAINTEXT
┌─────────────┐   1000 MB/s   ┌─────────────┐   10 MB/s   ┌─────────────┐
│  Fast       │ ────────────► │  Transform  │ ──────────► │    Slow     │
│  Source     │               │  (buffer)   │             │    Sink     │
└─────────────┘               └─────────────┘             └─────────────┘

                              Memory grows!

How pipe() Handles It

pipe() automatically manages backpressure:

  1. When writable buffer is full, write() returns false
  2. pipe() pauses the readable stream
  3. When writable drains, it emits 'drain'
  4. pipe() resumes the readable stream

Manual Backpressure

When not using pipe():

JAVASCRIPT
const readable = fs.createReadStream("large-file.txt");
const writable = fs.createWriteStream("output.txt");
 
readable.on("data", (chunk) => {
  const canContinue = writable.write(chunk);
 
  if (!canContinue) {
    readable.pause();
    writable.once("drain", () => readable.resume());
  }
});
 
readable.on("end", () => writable.end());

Warning

Always handle backpressure in production code. Ignoring it leads to memory exhaustion and crashes under load.


Object Mode

By default, streams work with Buffers/strings. Object mode allows any JavaScript value:

JAVASCRIPT
const { Transform } = require("stream");
 
const parseJSON = new Transform({
  objectMode: true, // Enable object mode
  transform(line, encoding, callback) {
    try {
      const obj = JSON.parse(line);
      this.push(obj); // Push object, not buffer
      callback();
    } catch (err) {
      callback(err);
    }
  },
});
 
// Usage
parseJSON.write('{"name": "Alice"}');
parseJSON.on("data", (obj) => {
  console.log(obj.name); // 'Alice'
});

Mixed Mode Streams

You can have different modes for read and write sides:

JAVASCRIPT
const { Transform } = require("stream");
 
const objectToBuffer = new Transform({
  writableObjectMode: true, // Accept objects
  readableObjectMode: false, // Output buffers
  transform(obj, encoding, callback) {
    this.push(JSON.stringify(obj) + "\n");
    callback();
  },
});

Practical Patterns

Processing Large Files

JAVASCRIPT
const { createReadStream } = require("fs");
const { createInterface } = require("readline");
 
async function processLargeFile(filepath) {
  const stream = createReadStream(filepath);
  const rl = createInterface({ input: stream });
 
  let lineNumber = 0;
 
  for await (const line of rl) {
    lineNumber++;
    // Process each line without loading entire file
    await processLine(line);
  }
 
  console.log(`Processed ${lineNumber} lines`);
}

HTTP Streaming

JAVASCRIPT
const http = require("http");
const fs = require("fs");
 
http
  .createServer((req, res) => {
    // Stream file directly to response
    const stream = fs.createReadStream("video.mp4");
 
    res.setHeader("Content-Type", "video/mp4");
    stream.pipe(res);
 
    stream.on("error", (err) => {
      res.statusCode = 500;
      res.end("Error streaming file");
    });
  })
  .listen(3000);

Compression Pipeline

JAVASCRIPT
const { pipeline } = require("stream/promises");
const { createReadStream, createWriteStream } = require("fs");
const { createGzip, createGunzip } = require("zlib");
 
// Compress
await pipeline(
  createReadStream("data.json"),
  createGzip(),
  createWriteStream("data.json.gz"),
);
 
// Decompress
await pipeline(
  createReadStream("data.json.gz"),
  createGunzip(),
  createWriteStream("data-restored.json"),
);

Progress Tracking

JAVASCRIPT
const { Transform } = require("stream");
 
function createProgressStream(total) {
  let transferred = 0;
 
  return new Transform({
    transform(chunk, encoding, callback) {
      transferred += chunk.length;
      const percent = ((transferred / total) * 100).toFixed(1);
      process.stdout.write(`\rProgress: ${percent}%`);
      callback(null, chunk);
    },
  });
}
 
// Usage
const { size } = fs.statSync("large-file.zip");
await pipeline(
  fs.createReadStream("large-file.zip"),
  createProgressStream(size),
  fs.createWriteStream("copy.zip"),
);

Summary

Streams are essential for efficient data handling:

Stream TypePurposeExamples
ReadableProduce dataFile read, HTTP response
WritableConsume dataFile write, HTTP request
TransformModify dataCompression, encryption
DuplexBoth directionsTCP socket

Key concepts:

  1. Buffers hold binary data—the currency of streams
  2. Backpressure prevents memory exhaustion
  3. pipeline() handles errors and cleanup properly
  4. Object mode allows streaming JavaScript objects
  5. Flowing vs paused controls data flow mode

Best practices:

  • Use pipeline() instead of pipe() for error handling
  • Always handle backpressure in custom implementations
  • Prefer streams for large data or network operations
  • Use object mode for structured data processing

Note

Understanding streams is what separates Node.js beginners from experts. They're used throughout the ecosystem—HTTP, file system, databases, compression, and more.