Learning Guides
Menu

Messaging and Integration

10 min readNode.js Design Patterns

Messaging and Integration

As applications grow, components need to communicate reliably without tight coupling. This chapter covers messaging patterns that enable scalable, resilient systems.

Why Messaging?

Direct HTTP calls create tight coupling:

PLAINTEXT
┌─────────┐        ┌─────────┐
│ Service │  HTTP  │ Service │
│    A    │───────►│    B    │  If B is down, A fails
└─────────┘        └─────────┘

Message queues decouple services:

PLAINTEXT
┌─────────┐        ┌─────────┐        ┌─────────┐
│ Service │ ─────► │  Queue  │ ─────► │ Service │
│    A    │        │         │        │    B    │
└─────────┘        └─────────┘        └─────────┘

          If B is down, messages wait

Benefits:

  • Decoupling: Services don't need to know about each other
  • Resilience: Messages persist if consumers are down
  • Scalability: Add more consumers to handle load
  • Load leveling: Queue absorbs traffic spikes

Message Queue Patterns

Point-to-Point Queue

Each message is consumed by exactly one consumer:

PLAINTEXT
Producer ──► Queue ──► Consumer 1
                  └──► Consumer 2  (competing consumers)
                  └──► Consumer 3

Publish/Subscribe

Messages are delivered to all subscribers:

PLAINTEXT
                  ┌──► Subscriber 1
Publisher ──► Topic ──► Subscriber 2
                  └──► Subscriber 3

Request/Reply

Synchronous-style communication over async messaging:

PLAINTEXT
Client ──request──► Queue ──► Server
   ◄──reply───── Reply Queue ◄──┘

Using Redis for Messaging

Simple Queue with Redis Lists

JAVASCRIPT
const Redis = require("ioredis");
 
class RedisQueue {
  constructor(queueName) {
    this.queueName = queueName;
    this.redis = new Redis();
  }
 
  async push(message) {
    await this.redis.lpush(this.queueName, JSON.stringify(message));
  }
 
  async pop(timeout = 0) {
    const result = await this.redis.brpop(this.queueName, timeout);
    if (!result) return null;
    return JSON.parse(result[1]);
  }
 
  async length() {
    return this.redis.llen(this.queueName);
  }
}
 
// Producer
const queue = new RedisQueue("tasks");
await queue.push({ type: "email", to: "user@example.com" });
 
// Consumer
while (true) {
  const task = await queue.pop(5); // Block for 5 seconds
  if (task) {
    await processTask(task);
  }
}

Redis Pub/Sub

JAVASCRIPT
const Redis = require("ioredis");
 
// Publisher
const pub = new Redis();
 
async function publish(channel, message) {
  await pub.publish(channel, JSON.stringify(message));
}
 
// Subscriber
const sub = new Redis();
 
sub.subscribe("notifications", "events", (err, count) => {
  console.log(`Subscribed to ${count} channels`);
});
 
sub.on("message", (channel, message) => {
  const data = JSON.parse(message);
  console.log(`[${channel}]`, data);
});
 
// Usage
await publish("notifications", { type: "alert", message: "Server restarting" });

Warning

Redis Pub/Sub doesn't persist messages. If no subscriber is connected, messages are lost. For durability, use Redis Streams or a dedicated message broker.

Redis Streams

Redis Streams provide persistent, replayable message logs:

JAVASCRIPT
const Redis = require("ioredis");
 
class RedisStream {
  constructor(streamName) {
    this.streamName = streamName;
    this.redis = new Redis();
  }
 
  async add(data) {
    // '*' auto-generates message ID
    return this.redis.xadd(
      this.streamName,
      "*",
      ...Object.entries(data).flat(),
    );
  }
 
  async read(lastId = "$", count = 10, block = 5000) {
    const result = await this.redis.xread(
      "BLOCK",
      block,
      "COUNT",
      count,
      "STREAMS",
      this.streamName,
      lastId,
    );
 
    if (!result) return [];
 
    return result[0][1].map(([id, fields]) => ({
      id,
      data: this.fieldsToObject(fields),
    }));
  }
 
  fieldsToObject(fields) {
    const obj = {};
    for (let i = 0; i < fields.length; i += 2) {
      obj[fields[i]] = fields[i + 1];
    }
    return obj;
  }
}
 
// Producer
const stream = new RedisStream("events");
await stream.add({ type: "order", orderId: "123", amount: "99.99" });
 
// Consumer
let lastId = "0"; // Start from beginning
while (true) {
  const messages = await stream.read(lastId);
  for (const { id, data } of messages) {
    console.log("Processing:", data);
    lastId = id;
  }
}

RabbitMQ

RabbitMQ is a full-featured message broker:

Basic Queue

JAVASCRIPT
const amqp = require("amqplib");
 
// Producer
async function sendMessage(queue, message) {
  const connection = await amqp.connect("amqp://localhost");
  const channel = await connection.createChannel();
 
  await channel.assertQueue(queue, { durable: true });
 
  channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
    persistent: true, // Survive broker restart
  });
 
  console.log("Sent:", message);
 
  await channel.close();
  await connection.close();
}
 
// Consumer
async function consumeMessages(queue, handler) {
  const connection = await amqp.connect("amqp://localhost");
  const channel = await connection.createChannel();
 
  await channel.assertQueue(queue, { durable: true });
  await channel.prefetch(1); // Process one at a time
 
  channel.consume(queue, async (msg) => {
    const message = JSON.parse(msg.content.toString());
 
    try {
      await handler(message);
      channel.ack(msg); // Acknowledge successful processing
    } catch (error) {
      console.error("Processing failed:", error);
      channel.nack(msg, false, true); // Requeue for retry
    }
  });
 
  console.log("Waiting for messages...");
}
 
// Usage
await sendMessage("tasks", { type: "email", to: "user@example.com" });
await consumeMessages("tasks", async (msg) => {
  await sendEmail(msg.to, msg.subject, msg.body);
});

RabbitMQ with Exchange Patterns

JAVASCRIPT
const amqp = require("amqplib");
 
class RabbitMQ {
  async connect(url = "amqp://localhost") {
    this.connection = await amqp.connect(url);
    this.channel = await this.connection.createChannel();
  }
 
  // Direct Exchange - route by exact key
  async setupDirect(exchange, queue, routingKey) {
    await this.channel.assertExchange(exchange, "direct", { durable: true });
    await this.channel.assertQueue(queue, { durable: true });
    await this.channel.bindQueue(queue, exchange, routingKey);
  }
 
  // Topic Exchange - route by pattern
  async setupTopic(exchange, queue, pattern) {
    await this.channel.assertExchange(exchange, "topic", { durable: true });
    await this.channel.assertQueue(queue, { durable: true });
    await this.channel.bindQueue(queue, exchange, pattern);
  }
 
  // Fanout Exchange - broadcast to all
  async setupFanout(exchange, queue) {
    await this.channel.assertExchange(exchange, "fanout", { durable: true });
    await this.channel.assertQueue(queue, { durable: true });
    await this.channel.bindQueue(queue, exchange, "");
  }
 
  async publish(exchange, routingKey, message) {
    this.channel.publish(
      exchange,
      routingKey,
      Buffer.from(JSON.stringify(message)),
      { persistent: true },
    );
  }
 
  async consume(queue, handler) {
    await this.channel.prefetch(10);
 
    this.channel.consume(queue, async (msg) => {
      if (!msg) return;
 
      try {
        const data = JSON.parse(msg.content.toString());
        await handler(data);
        this.channel.ack(msg);
      } catch (error) {
        // Dead letter after 3 retries
        const retries = (msg.properties.headers?.retries || 0) + 1;
        if (retries >= 3) {
          this.channel.nack(msg, false, false);
        } else {
          this.channel.nack(msg, false, true);
        }
      }
    });
  }
}
 
// Usage - Order events
const mq = new RabbitMQ();
await mq.connect();
 
// Publisher
await mq.setupTopic("orders", "", "");
await mq.publish("orders", "order.created", { orderId: "123", amount: 99.99 });
await mq.publish("orders", "order.shipped", {
  orderId: "123",
  trackingNo: "ABC",
});
 
// Email service - listens to order.created
await mq.setupTopic("orders", "email-queue", "order.created");
await mq.consume("email-queue", async (order) => {
  await sendConfirmationEmail(order);
});
 
// Analytics service - listens to all order events
await mq.setupTopic("orders", "analytics-queue", "order.*");
await mq.consume("analytics-queue", async (event) => {
  await trackEvent(event);
});

Event-Driven Architecture

Event Sourcing

Store events as the source of truth:

JAVASCRIPT
class EventStore {
  constructor(storage) {
    this.storage = storage;
  }
 
  async append(streamId, events) {
    const existing = (await this.storage.get(streamId)) || [];
    const version = existing.length;
 
    const newEvents = events.map((event, i) => ({
      ...event,
      streamId,
      version: version + i + 1,
      timestamp: new Date().toISOString(),
    }));
 
    await this.storage.set(streamId, [...existing, ...newEvents]);
    return newEvents;
  }
 
  async getEvents(streamId, fromVersion = 0) {
    const events = (await this.storage.get(streamId)) || [];
    return events.filter((e) => e.version > fromVersion);
  }
}
 
// Aggregate - rebuilds state from events
class Order {
  constructor(id) {
    this.id = id;
    this.items = [];
    this.status = "pending";
    this.version = 0;
  }
 
  // Apply events to rebuild state
  apply(event) {
    switch (event.type) {
      case "OrderCreated":
        this.items = event.data.items;
        this.status = "created";
        break;
      case "OrderShipped":
        this.status = "shipped";
        this.trackingNumber = event.data.trackingNumber;
        break;
      case "OrderDelivered":
        this.status = "delivered";
        break;
    }
    this.version = event.version;
  }
 
  // Rebuild from event stream
  static async fromEvents(eventStore, id) {
    const order = new Order(id);
    const events = await eventStore.getEvents(id);
    events.forEach((e) => order.apply(e));
    return order;
  }
}
 
// Usage
const store = new EventStore(new Map());
 
// Create order
await store.append("order-123", [
  { type: "OrderCreated", data: { items: ["item1", "item2"] } },
]);
 
// Ship order
await store.append("order-123", [
  { type: "OrderShipped", data: { trackingNumber: "TRACK123" } },
]);
 
// Rebuild state
const order = await Order.fromEvents(store, "order-123");
console.log(order.status); // 'shipped'

CQRS (Command Query Responsibility Segregation)

Separate read and write models:

JAVASCRIPT
// Commands - handle writes
class OrderCommandHandler {
  constructor(eventStore, eventBus) {
    this.eventStore = eventStore;
    this.eventBus = eventBus;
  }
 
  async createOrder(command) {
    const events = await this.eventStore.append(command.orderId, [
      { type: "OrderCreated", data: { items: command.items } },
    ]);
 
    for (const event of events) {
      await this.eventBus.publish(event);
    }
  }
 
  async shipOrder(command) {
    const events = await this.eventStore.append(command.orderId, [
      {
        type: "OrderShipped",
        data: { trackingNumber: command.trackingNumber },
      },
    ]);
 
    for (const event of events) {
      await this.eventBus.publish(event);
    }
  }
}
 
// Queries - read from optimized view
class OrderQueryHandler {
  constructor(readDb) {
    this.readDb = readDb;
  }
 
  async getOrder(id) {
    return this.readDb.findById("orders", id);
  }
 
  async getOrdersByStatus(status) {
    return this.readDb.find("orders", { status });
  }
 
  async getOrderStats() {
    return this.readDb.aggregate("orders", [
      { $group: { _id: "$status", count: { $sum: 1 } } },
    ]);
  }
}
 
// Projector - updates read model from events
class OrderProjector {
  constructor(readDb) {
    this.readDb = readDb;
  }
 
  async handle(event) {
    switch (event.type) {
      case "OrderCreated":
        await this.readDb.insert("orders", {
          id: event.streamId,
          items: event.data.items,
          status: "created",
          createdAt: event.timestamp,
        });
        break;
 
      case "OrderShipped":
        await this.readDb.update(
          "orders",
          { id: event.streamId },
          { status: "shipped", trackingNumber: event.data.trackingNumber },
        );
        break;
    }
  }
}

Saga Pattern

Manage distributed transactions across services:

JAVASCRIPT
class OrderSaga {
  constructor(services) {
    this.services = services;
    this.compensations = [];
  }
 
  async execute(orderData) {
    try {
      // Step 1: Reserve inventory
      const reservation = await this.services.inventory.reserve(
        orderData.items,
      );
      this.compensations.push(() =>
        this.services.inventory.release(reservation.id),
      );
 
      // Step 2: Charge payment
      const payment = await this.services.payment.charge(
        orderData.paymentMethod,
        orderData.total,
      );
      this.compensations.push(() => this.services.payment.refund(payment.id));
 
      // Step 3: Create order
      const order = await this.services.orders.create({
        ...orderData,
        reservationId: reservation.id,
        paymentId: payment.id,
      });
 
      // Step 4: Schedule shipping
      await this.services.shipping.schedule(order.id, orderData.address);
 
      return { success: true, orderId: order.id };
    } catch (error) {
      // Compensate in reverse order
      console.log("Saga failed, compensating...");
 
      for (let i = this.compensations.length - 1; i >= 0; i--) {
        try {
          await this.compensations[i]();
        } catch (compError) {
          console.error("Compensation failed:", compError);
          // Log for manual intervention
        }
      }
 
      return { success: false, error: error.message };
    }
  }
}
 
// Choreography-based saga (event-driven)
class InventoryService {
  async handleOrderCreated(event) {
    try {
      const reservation = await this.reserve(event.items);
      await eventBus.publish({
        type: "InventoryReserved",
        orderId: event.orderId,
        reservationId: reservation.id,
      });
    } catch (error) {
      await eventBus.publish({
        type: "InventoryReservationFailed",
        orderId: event.orderId,
        reason: error.message,
      });
    }
  }
 
  async handleOrderCancelled(event) {
    await this.release(event.reservationId);
  }
}

Reliable Event Publishing

JAVASCRIPT
// Outbox pattern - ensure events are published even if broker is down
class OutboxEventPublisher {
  constructor(db, eventBus) {
    this.db = db;
    this.eventBus = eventBus;
  }
 
  // Save event to outbox in same transaction as business data
  async publishInTransaction(tx, event) {
    await tx.insert("outbox", {
      id: crypto.randomUUID(),
      event: JSON.stringify(event),
      status: "pending",
      createdAt: new Date(),
    });
  }
 
  // Background job publishes pending events
  async processOutbox() {
    const pending = await this.db.find("outbox", {
      status: "pending",
      createdAt: { $lt: new Date(Date.now() - 5000) }, // At least 5s old
    });
 
    for (const record of pending) {
      try {
        await this.eventBus.publish(JSON.parse(record.event));
        await this.db.update(
          "outbox",
          { id: record.id },
          { status: "published", publishedAt: new Date() },
        );
      } catch (error) {
        console.error("Failed to publish:", error);
        await this.db.update(
          "outbox",
          { id: record.id },
          { $inc: { attempts: 1 }, lastError: error.message },
        );
      }
    }
  }
}
 
// Usage
async function createOrder(orderData) {
  const tx = await db.startTransaction();
 
  try {
    const order = await tx.insert("orders", orderData);
 
    await outboxPublisher.publishInTransaction(tx, {
      type: "OrderCreated",
      orderId: order.id,
      data: order,
    });
 
    await tx.commit();
    return order;
  } catch (error) {
    await tx.rollback();
    throw error;
  }
}
 
// Run outbox processor periodically
setInterval(() => outboxPublisher.processOutbox(), 10000);

Summary

Messaging patterns for scalable systems:

PatternPurpose
Point-to-PointTask distribution, load balancing
Pub/SubEvent broadcasting
Request/ReplyAsync RPC
Event SourcingAudit trail, temporal queries
CQRSOptimize reads and writes separately
SagaDistributed transactions
OutboxReliable event publishing

Key takeaways:

  1. Decoupling improves resilience and scalability
  2. Persistence ensures messages survive failures
  3. Acknowledgments enable exactly-once processing
  4. Dead letter queues handle poison messages
  5. Compensating transactions handle distributed failures

Note

Start with simple HTTP calls. Introduce messaging when you need decoupling, resilience, or asynchronous processing. Messaging adds complexity—make sure it's justified.