Message Queues & Event-Driven Architecture

📖 Concept

Message queues decouple services by allowing them to communicate asynchronously through a broker. The sender doesn't wait for the receiver to process the message.

Popular message brokers:

Broker Strength Use Case
RabbitMQ Flexible routing, reliability Task queues, pub/sub
Apache Kafka High throughput, log-based Event streaming, analytics
Redis (BullMQ) Simple, fast, built on Redis Job queues, delayed tasks
AWS SQS Managed, serverless Cloud-native queues

Message queue patterns:

  1. Point-to-Point (Queue) — one sender, one consumer (task distribution)
  2. Pub/Sub (Topics) — one publisher, multiple subscribers (event broadcasting)
  3. Fan-out — one message goes to multiple queues
  4. Dead Letter Queue — failed messages collected for manual review

BullMQ (Node.js job queue built on Redis):

  • Priority queues
  • Delayed jobs
  • Job retry with backoff
  • Rate limiting
  • Job progress tracking
  • Repeatable jobs (cron-like)
  • Scheduled jobs

When to use message queues:

  • ✅ Email/SMS sending (async, no blocking)
  • ✅ Image/video processing (CPU-heavy, offload)
  • ✅ Order processing (multi-step workflow)
  • ✅ Data pipelines and ETL (high throughput)
  • ✅ Event broadcasting to multiple services

🏠 Real-world analogy: A message queue is like a post office. You drop a letter (message) in the mailbox (queue). The postal service (broker) ensures it reaches the recipient (consumer), even if they're not home (offline). The sender doesn't wait at the door — they go about their business.

💻 Code Example

codeTap to expand ⛶
1// Message Queues with BullMQ
2
3const { Queue, Worker, QueueScheduler } = require("bullmq");
4const IORedis = require("ioredis");
5
6// Redis connection (shared)
7const connection = new IORedis({
8 host: process.env.REDIS_HOST || "localhost",
9 port: 6379,
10 maxRetriesPerRequest: null, // Required by BullMQ
11});
12
13// 1. Create queues
14const emailQueue = new Queue("emails", { connection });
15const imageQueue = new Queue("image-processing", { connection });
16const orderQueue = new Queue("orders", { connection });
17
18// 2. Add jobs to queues
19async function sendWelcomeEmail(userId, email) {
20 await emailQueue.add(
21 "welcome-email",
22 { userId, email, template: "welcome" },
23 {
24 attempts: 3,
25 backoff: { type: "exponential", delay: 2000 },
26 removeOnComplete: 100, // Keep last 100 completed jobs
27 removeOnFail: 500,
28 }
29 );
30}
31
32async function processImage(imageUrl, options) {
33 await imageQueue.add(
34 "resize",
35 { imageUrl, width: options.width, height: options.height },
36 {
37 priority: options.priority || 5,
38 delay: 0,
39 }
40 );
41}
42
43// Delayed job (send reminder in 24 hours)
44async function scheduleReminder(userId) {
45 await emailQueue.add(
46 "reminder",
47 { userId, template: "reminder" },
48 { delay: 24 * 60 * 60 * 1000 } // 24 hours
49 );
50}
51
52// Repeating job (daily report)
53async function setupDailyReport() {
54 await emailQueue.add(
55 "daily-report",
56 { template: "daily-report" },
57 { repeat: { pattern: "0 9 * * *" } } // Every day at 9 AM (cron)
58 );
59}
60
61// 3. Process jobs (workers)
62const emailWorker = new Worker(
63 "emails",
64 async (job) => {
65 console.log(`Processing email job ${job.id}: ${job.name}`);
66 const { email, template, userId } = job.data;
67
68 // Update progress
69 await job.updateProgress(10);
70
71 // Simulate sending email
72 // await sendEmail(email, template);
73 console.log(`Email sent to ${email} (template: ${template})`);
74
75 await job.updateProgress(100);
76 return { sent: true, email };
77 },
78 {
79 connection,
80 concurrency: 5, // Process 5 emails simultaneously
81 limiter: {
82 max: 100,
83 duration: 60000, // Max 100 emails per minute
84 },
85 }
86);
87
88const imageWorker = new Worker(
89 "image-processing",
90 async (job) => {
91 console.log(`Processing image: ${job.data.imageUrl}`);
92 // const result = await sharp(job.data.imageUrl)
93 // .resize(job.data.width, job.data.height)
94 // .toFile(outputPath);
95 return { processed: true };
96 },
97 { connection, concurrency: 2 }
98);
99
100// 4. Event handling
101emailWorker.on("completed", (job, result) => {
102 console.log(`Job ${job.id} completed:`, result);
103});
104
105emailWorker.on("failed", (job, err) => {
106 console.error(`Job ${job.id} failed after ${job.attemptsMade} attempts:`, err.message);
107 // If all retries exhausted, job goes to "failed" state
108 // Implement dead letter queue logic or alerting here
109});
110
111emailWorker.on("progress", (job, progress) => {
112 console.log(`Job ${job.id} progress: ${progress}%`);
113});
114
115// 5. Flow (dependent jobs)
116async function processOrderFlow(orderData) {
117 // Job A: Validate order (must complete before B and C)
118 // Job B: Process payment (after A)
119 // Job C: Reserve inventory (after A, parallel with B)
120 // Job D: Send confirmation (after B and C both complete)
121
122 const flow = await orderQueue.add("process-order", orderData);
123 return flow;
124}
125
126// 6. Graceful shutdown
127async function shutdown() {
128 console.log("Shutting down workers...");
129 await emailWorker.close();
130 await imageWorker.close();
131 await connection.quit();
132 process.exit(0);
133}
134
135process.on("SIGTERM", shutdown);
136process.on("SIGINT", shutdown);
137
138module.exports = { emailQueue, imageQueue, sendWelcomeEmail, processImage };

🏋️ Practice Exercise

Exercises:

  1. Set up BullMQ with Redis and create queues for email, image processing, and reports
  2. Implement email sending with retry logic — 3 attempts with exponential backoff
  3. Build an image processing pipeline with priority queues (user-uploaded = high, batch = low)
  4. Implement delayed jobs — send a reminder email 24 hours after signup
  5. Set up repeating jobs for daily reports and hourly health checks using cron patterns
  6. Build a dashboard that shows queue health: pending, active, completed, and failed job counts

⚠️ Common Mistakes

  • Processing messages without idempotency — messages may be delivered more than once; ensure processing the same message twice has no side effects

  • Not setting up dead letter queues — failed messages disappear silently; capture them for debugging and manual retry

  • Not limiting concurrency on workers — processing too many jobs simultaneously can overwhelm the database or external APIs

  • Storing large payloads in job data — queues should only contain references (IDs, URLs); store the actual data in the database or object storage

  • Not implementing graceful shutdown for workers — worker.close() waits for active jobs to complete; calling process.exit() directly loses in-progress work

💼 Interview Questions

🎤 Mock Interview

Mock interview is powered by AI for Message Queues & Event-Driven Architecture. Login to unlock this feature.