I’m working with BullMQ (v5+) and need to implement a worker to process messages for different chats, with the following requirements:
Rate Limit:
- Each queue should have a rate limit of 1 job per 3 seconds, but rate limits should be specific to each chat / group, not global.
Behavior:
- The worker should process jobs in the order they were added.
- Jobs should be processed with the rate limit in place.
- Each chat / group should have a separate processing order with its own rate limit.
The code below consumes an INCREDIBLE amount of memory (0.5 gb+). I can only have one copy of the application and trying to find a way to optimize the part with JS Maps multiple queues
and workers
. Seems like there should be another simpler way to implement a chat queue.
const { Queue, Worker } = require('bullmq');
const Redis = require('ioredis');
const redisConnection = {
host: 'localhost',
port: 6379,
maxRetriesPerRequest: null,
};
const redis = new Redis(redisConnection);
const createQueueName = (group) => `chat_messages__${group}`;
class ChatMessageWorker {
constructor() {
this.queues = new Map();
this.workers = new Map();
}
createWorkerForGroup(group) {
const queueName = createQueueName(group);
const queue = new Queue(queueName, {
connection: redis,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
},
});
const worker = new Worker(
queueName,
async (job) => {
const processingTime = new Date().toLocaleTimeString();
console.log(`>>> Processing job ${job.id} \
for group ${group} at ${processingTime} - message sent to chat!`);
},
{
connection: redis,
limiter: {
max: 1,
duration: 3000,
},
}
);
worker.on('error', (error) => {
console.error(`Worker for group ${group} encountered an error:`, error);
});
this.queues.set(group, queue);
this.workers.set(group, worker);
return queue;
}
async addJobToGroup(group, data) {
const queue = this.queues.get(group) || this.createWorkerForGroup(group);
const addTime = new Date().toLocaleTimeString();
const job = await queue.add(group, {
...data,
addedAt: addTime,
});
console.log(`Job added to group ${group} at ${addTime}`);
return job;
}
async close() {
for (const worker of this.workers.values()) {
await worker.close();
}
for (const queue of this.queues.values()) {
await queue.close();
}
}
}
// Example usage
async function runChatMessageExample() {
const chatMessageWorker = new ChatMessageWorker();
const chatIds = ['chat1', 'chat2', 'chat3'];
// Add 5 messages to each chat group
for (const chatId of chatIds) {
for (let i = 1; i <= 5; i++) {
await chatMessageWorker.addJobToGroup(chatId, {
message: `Message ${i} for ${chatId}`,
});
}
}
// Optional: Keep the process running to allow workers to process jobs
await new Promise(resolve => setTimeout(resolve, 30000));
// Close workers and queues
await chatMessageWorker.close();
}
runChatMessageExample().catch(console.error);