Messaging and Integration
Messaging and Integration
As applications grow, components need to communicate reliably without tight coupling. This chapter covers messaging patterns that enable scalable, resilient systems.
Why Messaging?
Direct HTTP calls create tight coupling:
┌─────────┐ ┌─────────┐
│ Service │ HTTP │ Service │
│ A │───────►│ B │ If B is down, A fails
└─────────┘ └─────────┘Message queues decouple services:
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Service │ ─────► │ Queue │ ─────► │ Service │
│ A │ │ │ │ B │
└─────────┘ └─────────┘ └─────────┘
│
If B is down, messages waitBenefits:
- Decoupling: Services don't need to know about each other
- Resilience: Messages persist if consumers are down
- Scalability: Add more consumers to handle load
- Load leveling: Queue absorbs traffic spikes
Message Queue Patterns
Point-to-Point Queue
Each message is consumed by exactly one consumer:
Producer ──► Queue ──► Consumer 1
└──► Consumer 2 (competing consumers)
└──► Consumer 3Publish/Subscribe
Messages are delivered to all subscribers:
┌──► Subscriber 1
Publisher ──► Topic ──► Subscriber 2
└──► Subscriber 3Request/Reply
Synchronous-style communication over async messaging:
Client ──request──► Queue ──► Server
◄──reply───── Reply Queue ◄──┘Using Redis for Messaging
Simple Queue with Redis Lists
const Redis = require("ioredis");
class RedisQueue {
constructor(queueName) {
this.queueName = queueName;
this.redis = new Redis();
}
async push(message) {
await this.redis.lpush(this.queueName, JSON.stringify(message));
}
async pop(timeout = 0) {
const result = await this.redis.brpop(this.queueName, timeout);
if (!result) return null;
return JSON.parse(result[1]);
}
async length() {
return this.redis.llen(this.queueName);
}
}
// Producer
const queue = new RedisQueue("tasks");
await queue.push({ type: "email", to: "user@example.com" });
// Consumer
while (true) {
const task = await queue.pop(5); // Block for 5 seconds
if (task) {
await processTask(task);
}
}Redis Pub/Sub
const Redis = require("ioredis");
// Publisher
const pub = new Redis();
async function publish(channel, message) {
await pub.publish(channel, JSON.stringify(message));
}
// Subscriber
const sub = new Redis();
sub.subscribe("notifications", "events", (err, count) => {
console.log(`Subscribed to ${count} channels`);
});
sub.on("message", (channel, message) => {
const data = JSON.parse(message);
console.log(`[${channel}]`, data);
});
// Usage
await publish("notifications", { type: "alert", message: "Server restarting" });Warning
Redis Pub/Sub doesn't persist messages. If no subscriber is connected, messages are lost. For durability, use Redis Streams or a dedicated message broker.
Redis Streams
Redis Streams provide persistent, replayable message logs:
const Redis = require("ioredis");
class RedisStream {
constructor(streamName) {
this.streamName = streamName;
this.redis = new Redis();
}
async add(data) {
// '*' auto-generates message ID
return this.redis.xadd(
this.streamName,
"*",
...Object.entries(data).flat(),
);
}
async read(lastId = "$", count = 10, block = 5000) {
const result = await this.redis.xread(
"BLOCK",
block,
"COUNT",
count,
"STREAMS",
this.streamName,
lastId,
);
if (!result) return [];
return result[0][1].map(([id, fields]) => ({
id,
data: this.fieldsToObject(fields),
}));
}
fieldsToObject(fields) {
const obj = {};
for (let i = 0; i < fields.length; i += 2) {
obj[fields[i]] = fields[i + 1];
}
return obj;
}
}
// Producer
const stream = new RedisStream("events");
await stream.add({ type: "order", orderId: "123", amount: "99.99" });
// Consumer
let lastId = "0"; // Start from beginning
while (true) {
const messages = await stream.read(lastId);
for (const { id, data } of messages) {
console.log("Processing:", data);
lastId = id;
}
}RabbitMQ
RabbitMQ is a full-featured message broker:
Basic Queue
const amqp = require("amqplib");
// Producer
async function sendMessage(queue, message) {
const connection = await amqp.connect("amqp://localhost");
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
persistent: true, // Survive broker restart
});
console.log("Sent:", message);
await channel.close();
await connection.close();
}
// Consumer
async function consumeMessages(queue, handler) {
const connection = await amqp.connect("amqp://localhost");
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: true });
await channel.prefetch(1); // Process one at a time
channel.consume(queue, async (msg) => {
const message = JSON.parse(msg.content.toString());
try {
await handler(message);
channel.ack(msg); // Acknowledge successful processing
} catch (error) {
console.error("Processing failed:", error);
channel.nack(msg, false, true); // Requeue for retry
}
});
console.log("Waiting for messages...");
}
// Usage
await sendMessage("tasks", { type: "email", to: "user@example.com" });
await consumeMessages("tasks", async (msg) => {
await sendEmail(msg.to, msg.subject, msg.body);
});RabbitMQ with Exchange Patterns
const amqp = require("amqplib");
class RabbitMQ {
async connect(url = "amqp://localhost") {
this.connection = await amqp.connect(url);
this.channel = await this.connection.createChannel();
}
// Direct Exchange - route by exact key
async setupDirect(exchange, queue, routingKey) {
await this.channel.assertExchange(exchange, "direct", { durable: true });
await this.channel.assertQueue(queue, { durable: true });
await this.channel.bindQueue(queue, exchange, routingKey);
}
// Topic Exchange - route by pattern
async setupTopic(exchange, queue, pattern) {
await this.channel.assertExchange(exchange, "topic", { durable: true });
await this.channel.assertQueue(queue, { durable: true });
await this.channel.bindQueue(queue, exchange, pattern);
}
// Fanout Exchange - broadcast to all
async setupFanout(exchange, queue) {
await this.channel.assertExchange(exchange, "fanout", { durable: true });
await this.channel.assertQueue(queue, { durable: true });
await this.channel.bindQueue(queue, exchange, "");
}
async publish(exchange, routingKey, message) {
this.channel.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(message)),
{ persistent: true },
);
}
async consume(queue, handler) {
await this.channel.prefetch(10);
this.channel.consume(queue, async (msg) => {
if (!msg) return;
try {
const data = JSON.parse(msg.content.toString());
await handler(data);
this.channel.ack(msg);
} catch (error) {
// Dead letter after 3 retries
const retries = (msg.properties.headers?.retries || 0) + 1;
if (retries >= 3) {
this.channel.nack(msg, false, false);
} else {
this.channel.nack(msg, false, true);
}
}
});
}
}
// Usage - Order events
const mq = new RabbitMQ();
await mq.connect();
// Publisher
await mq.setupTopic("orders", "", "");
await mq.publish("orders", "order.created", { orderId: "123", amount: 99.99 });
await mq.publish("orders", "order.shipped", {
orderId: "123",
trackingNo: "ABC",
});
// Email service - listens to order.created
await mq.setupTopic("orders", "email-queue", "order.created");
await mq.consume("email-queue", async (order) => {
await sendConfirmationEmail(order);
});
// Analytics service - listens to all order events
await mq.setupTopic("orders", "analytics-queue", "order.*");
await mq.consume("analytics-queue", async (event) => {
await trackEvent(event);
});Event-Driven Architecture
Event Sourcing
Store events as the source of truth:
class EventStore {
constructor(storage) {
this.storage = storage;
}
async append(streamId, events) {
const existing = (await this.storage.get(streamId)) || [];
const version = existing.length;
const newEvents = events.map((event, i) => ({
...event,
streamId,
version: version + i + 1,
timestamp: new Date().toISOString(),
}));
await this.storage.set(streamId, [...existing, ...newEvents]);
return newEvents;
}
async getEvents(streamId, fromVersion = 0) {
const events = (await this.storage.get(streamId)) || [];
return events.filter((e) => e.version > fromVersion);
}
}
// Aggregate - rebuilds state from events
class Order {
constructor(id) {
this.id = id;
this.items = [];
this.status = "pending";
this.version = 0;
}
// Apply events to rebuild state
apply(event) {
switch (event.type) {
case "OrderCreated":
this.items = event.data.items;
this.status = "created";
break;
case "OrderShipped":
this.status = "shipped";
this.trackingNumber = event.data.trackingNumber;
break;
case "OrderDelivered":
this.status = "delivered";
break;
}
this.version = event.version;
}
// Rebuild from event stream
static async fromEvents(eventStore, id) {
const order = new Order(id);
const events = await eventStore.getEvents(id);
events.forEach((e) => order.apply(e));
return order;
}
}
// Usage
const store = new EventStore(new Map());
// Create order
await store.append("order-123", [
{ type: "OrderCreated", data: { items: ["item1", "item2"] } },
]);
// Ship order
await store.append("order-123", [
{ type: "OrderShipped", data: { trackingNumber: "TRACK123" } },
]);
// Rebuild state
const order = await Order.fromEvents(store, "order-123");
console.log(order.status); // 'shipped'CQRS (Command Query Responsibility Segregation)
Separate read and write models:
// Commands - handle writes
class OrderCommandHandler {
constructor(eventStore, eventBus) {
this.eventStore = eventStore;
this.eventBus = eventBus;
}
async createOrder(command) {
const events = await this.eventStore.append(command.orderId, [
{ type: "OrderCreated", data: { items: command.items } },
]);
for (const event of events) {
await this.eventBus.publish(event);
}
}
async shipOrder(command) {
const events = await this.eventStore.append(command.orderId, [
{
type: "OrderShipped",
data: { trackingNumber: command.trackingNumber },
},
]);
for (const event of events) {
await this.eventBus.publish(event);
}
}
}
// Queries - read from optimized view
class OrderQueryHandler {
constructor(readDb) {
this.readDb = readDb;
}
async getOrder(id) {
return this.readDb.findById("orders", id);
}
async getOrdersByStatus(status) {
return this.readDb.find("orders", { status });
}
async getOrderStats() {
return this.readDb.aggregate("orders", [
{ $group: { _id: "$status", count: { $sum: 1 } } },
]);
}
}
// Projector - updates read model from events
class OrderProjector {
constructor(readDb) {
this.readDb = readDb;
}
async handle(event) {
switch (event.type) {
case "OrderCreated":
await this.readDb.insert("orders", {
id: event.streamId,
items: event.data.items,
status: "created",
createdAt: event.timestamp,
});
break;
case "OrderShipped":
await this.readDb.update(
"orders",
{ id: event.streamId },
{ status: "shipped", trackingNumber: event.data.trackingNumber },
);
break;
}
}
}Saga Pattern
Manage distributed transactions across services:
class OrderSaga {
constructor(services) {
this.services = services;
this.compensations = [];
}
async execute(orderData) {
try {
// Step 1: Reserve inventory
const reservation = await this.services.inventory.reserve(
orderData.items,
);
this.compensations.push(() =>
this.services.inventory.release(reservation.id),
);
// Step 2: Charge payment
const payment = await this.services.payment.charge(
orderData.paymentMethod,
orderData.total,
);
this.compensations.push(() => this.services.payment.refund(payment.id));
// Step 3: Create order
const order = await this.services.orders.create({
...orderData,
reservationId: reservation.id,
paymentId: payment.id,
});
// Step 4: Schedule shipping
await this.services.shipping.schedule(order.id, orderData.address);
return { success: true, orderId: order.id };
} catch (error) {
// Compensate in reverse order
console.log("Saga failed, compensating...");
for (let i = this.compensations.length - 1; i >= 0; i--) {
try {
await this.compensations[i]();
} catch (compError) {
console.error("Compensation failed:", compError);
// Log for manual intervention
}
}
return { success: false, error: error.message };
}
}
}
// Choreography-based saga (event-driven)
class InventoryService {
async handleOrderCreated(event) {
try {
const reservation = await this.reserve(event.items);
await eventBus.publish({
type: "InventoryReserved",
orderId: event.orderId,
reservationId: reservation.id,
});
} catch (error) {
await eventBus.publish({
type: "InventoryReservationFailed",
orderId: event.orderId,
reason: error.message,
});
}
}
async handleOrderCancelled(event) {
await this.release(event.reservationId);
}
}Reliable Event Publishing
// Outbox pattern - ensure events are published even if broker is down
class OutboxEventPublisher {
constructor(db, eventBus) {
this.db = db;
this.eventBus = eventBus;
}
// Save event to outbox in same transaction as business data
async publishInTransaction(tx, event) {
await tx.insert("outbox", {
id: crypto.randomUUID(),
event: JSON.stringify(event),
status: "pending",
createdAt: new Date(),
});
}
// Background job publishes pending events
async processOutbox() {
const pending = await this.db.find("outbox", {
status: "pending",
createdAt: { $lt: new Date(Date.now() - 5000) }, // At least 5s old
});
for (const record of pending) {
try {
await this.eventBus.publish(JSON.parse(record.event));
await this.db.update(
"outbox",
{ id: record.id },
{ status: "published", publishedAt: new Date() },
);
} catch (error) {
console.error("Failed to publish:", error);
await this.db.update(
"outbox",
{ id: record.id },
{ $inc: { attempts: 1 }, lastError: error.message },
);
}
}
}
}
// Usage
async function createOrder(orderData) {
const tx = await db.startTransaction();
try {
const order = await tx.insert("orders", orderData);
await outboxPublisher.publishInTransaction(tx, {
type: "OrderCreated",
orderId: order.id,
data: order,
});
await tx.commit();
return order;
} catch (error) {
await tx.rollback();
throw error;
}
}
// Run outbox processor periodically
setInterval(() => outboxPublisher.processOutbox(), 10000);Summary
Messaging patterns for scalable systems:
| Pattern | Purpose |
|---|---|
| Point-to-Point | Task distribution, load balancing |
| Pub/Sub | Event broadcasting |
| Request/Reply | Async RPC |
| Event Sourcing | Audit trail, temporal queries |
| CQRS | Optimize reads and writes separately |
| Saga | Distributed transactions |
| Outbox | Reliable event publishing |
Key takeaways:
- Decoupling improves resilience and scalability
- Persistence ensures messages survive failures
- Acknowledgments enable exactly-once processing
- Dead letter queues handle poison messages
- Compensating transactions handle distributed failures
Note
Start with simple HTTP calls. Introduce messaging when you need decoupling, resilience, or asynchronous processing. Messaging adds complexity—make sure it's justified.