Message Queues & Event-Driven Architecture
📖 Concept
Message queues decouple services by allowing them to communicate asynchronously through a broker. The sender doesn't wait for the receiver to process the message.
Popular message brokers:
| Broker | Strength | Use Case |
|---|---|---|
| RabbitMQ | Flexible routing, reliability | Task queues, pub/sub |
| Apache Kafka | High throughput, log-based | Event streaming, analytics |
| Redis (BullMQ) | Simple, fast, built on Redis | Job queues, delayed tasks |
| AWS SQS | Managed, serverless | Cloud-native queues |
Message queue patterns:
- Point-to-Point (Queue) — one sender, one consumer (task distribution)
- Pub/Sub (Topics) — one publisher, multiple subscribers (event broadcasting)
- Fan-out — one message goes to multiple queues
- Dead Letter Queue — failed messages collected for manual review
BullMQ (Node.js job queue built on Redis):
- Priority queues
- Delayed jobs
- Job retry with backoff
- Rate limiting
- Job progress tracking
- Repeatable jobs (cron-like)
- Scheduled jobs
When to use message queues:
- ✅ Email/SMS sending (async, no blocking)
- ✅ Image/video processing (CPU-heavy, offload)
- ✅ Order processing (multi-step workflow)
- ✅ Data pipelines and ETL (high throughput)
- ✅ Event broadcasting to multiple services
🏠 Real-world analogy: A message queue is like a post office. You drop a letter (message) in the mailbox (queue). The postal service (broker) ensures it reaches the recipient (consumer), even if they're not home (offline). The sender doesn't wait at the door — they go about their business.
💻 Code Example
1// Message Queues with BullMQ23const { Queue, Worker, QueueScheduler } = require("bullmq");4const IORedis = require("ioredis");56// Redis connection (shared)7const connection = new IORedis({8 host: process.env.REDIS_HOST || "localhost",9 port: 6379,10 maxRetriesPerRequest: null, // Required by BullMQ11});1213// 1. Create queues14const emailQueue = new Queue("emails", { connection });15const imageQueue = new Queue("image-processing", { connection });16const orderQueue = new Queue("orders", { connection });1718// 2. Add jobs to queues19async function sendWelcomeEmail(userId, email) {20 await emailQueue.add(21 "welcome-email",22 { userId, email, template: "welcome" },23 {24 attempts: 3,25 backoff: { type: "exponential", delay: 2000 },26 removeOnComplete: 100, // Keep last 100 completed jobs27 removeOnFail: 500,28 }29 );30}3132async function processImage(imageUrl, options) {33 await imageQueue.add(34 "resize",35 { imageUrl, width: options.width, height: options.height },36 {37 priority: options.priority || 5,38 delay: 0,39 }40 );41}4243// Delayed job (send reminder in 24 hours)44async function scheduleReminder(userId) {45 await emailQueue.add(46 "reminder",47 { userId, template: "reminder" },48 { delay: 24 * 60 * 60 * 1000 } // 24 hours49 );50}5152// Repeating job (daily report)53async function setupDailyReport() {54 await emailQueue.add(55 "daily-report",56 { template: "daily-report" },57 { repeat: { pattern: "0 9 * * *" } } // Every day at 9 AM (cron)58 );59}6061// 3. Process jobs (workers)62const emailWorker = new Worker(63 "emails",64 async (job) => {65 console.log(`Processing email job ${job.id}: ${job.name}`);66 const { email, template, userId } = job.data;6768 // Update progress69 await job.updateProgress(10);7071 // Simulate sending email72 // await sendEmail(email, template);73 console.log(`Email sent to ${email} (template: ${template})`);7475 await job.updateProgress(100);76 return { sent: true, email };77 },78 {79 connection,80 concurrency: 5, // Process 5 emails simultaneously81 limiter: {82 max: 100,83 duration: 60000, // Max 100 emails per minute84 },85 }86);8788const imageWorker = new Worker(89 "image-processing",90 async (job) => {91 console.log(`Processing image: ${job.data.imageUrl}`);92 // const result = await sharp(job.data.imageUrl)93 // .resize(job.data.width, job.data.height)94 // .toFile(outputPath);95 return { processed: true };96 },97 { connection, concurrency: 2 }98);99100// 4. Event handling101emailWorker.on("completed", (job, result) => {102 console.log(`Job ${job.id} completed:`, result);103});104105emailWorker.on("failed", (job, err) => {106 console.error(`Job ${job.id} failed after ${job.attemptsMade} attempts:`, err.message);107 // If all retries exhausted, job goes to "failed" state108 // Implement dead letter queue logic or alerting here109});110111emailWorker.on("progress", (job, progress) => {112 console.log(`Job ${job.id} progress: ${progress}%`);113});114115// 5. Flow (dependent jobs)116async function processOrderFlow(orderData) {117 // Job A: Validate order (must complete before B and C)118 // Job B: Process payment (after A)119 // Job C: Reserve inventory (after A, parallel with B)120 // Job D: Send confirmation (after B and C both complete)121122 const flow = await orderQueue.add("process-order", orderData);123 return flow;124}125126// 6. Graceful shutdown127async function shutdown() {128 console.log("Shutting down workers...");129 await emailWorker.close();130 await imageWorker.close();131 await connection.quit();132 process.exit(0);133}134135process.on("SIGTERM", shutdown);136process.on("SIGINT", shutdown);137138module.exports = { emailQueue, imageQueue, sendWelcomeEmail, processImage };
🏋️ Practice Exercise
Exercises:
- Set up BullMQ with Redis and create queues for email, image processing, and reports
- Implement email sending with retry logic — 3 attempts with exponential backoff
- Build an image processing pipeline with priority queues (user-uploaded = high, batch = low)
- Implement delayed jobs — send a reminder email 24 hours after signup
- Set up repeating jobs for daily reports and hourly health checks using cron patterns
- Build a dashboard that shows queue health: pending, active, completed, and failed job counts
⚠️ Common Mistakes
Processing messages without idempotency — messages may be delivered more than once; ensure processing the same message twice has no side effects
Not setting up dead letter queues — failed messages disappear silently; capture them for debugging and manual retry
Not limiting concurrency on workers — processing too many jobs simultaneously can overwhelm the database or external APIs
Storing large payloads in job data — queues should only contain references (IDs, URLs); store the actual data in the database or object storage
Not implementing graceful shutdown for workers —
worker.close()waits for active jobs to complete; callingprocess.exit()directly loses in-progress work
💼 Interview Questions
🎤 Mock Interview
Mock interview is powered by AI for Message Queues & Event-Driven Architecture. Login to unlock this feature.