最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

node.js - BullMQ Worker with rate limits and unique group processing - Stack Overflow

programmeradmin2浏览0评论

I’m working with BullMQ (v5+) and need to implement a worker to process messages for different chats, with the following requirements:

Rate Limit:

  • Each queue should have a rate limit of 1 job per 3 seconds, but rate limits should be specific to each chat / group, not global.

Behavior:

  • The worker should process jobs in the order they were added.
  • Jobs should be processed with the rate limit in place.
  • Each chat / group should have a separate processing order with its own rate limit.

The code below consumes an INCREDIBLE amount of memory (0.5 gb+). I can only have one copy of the application and trying to find a way to optimize the part with JS Maps multiple queues and workers. Seems like there should be another simpler way to implement a chat queue.

const { Queue, Worker } = require('bullmq');
const Redis = require('ioredis');
const redisConnection = {
  host: 'localhost',
  port: 6379,
  maxRetriesPerRequest: null,
};
const redis = new Redis(redisConnection);
const createQueueName = (group) => `chat_messages__${group}`;
class ChatMessageWorker {
  constructor() {
    this.queues = new Map();
    this.workers = new Map();
  }
  createWorkerForGroup(group) {
    const queueName = createQueueName(group);
    const queue = new Queue(queueName, {
      connection: redis,
      defaultJobOptions: {
        removeOnComplete: true,
        removeOnFail: true,
      },
    });
    const worker = new Worker(
      queueName,
      async (job) => {
        const processingTime = new Date().toLocaleTimeString();
        console.log(`>>> Processing job ${job.id} \
for group ${group} at ${processingTime} - message sent to chat!`);
      },
      {
        connection: redis,
        limiter: {
          max: 1,
          duration: 3000,
        },
      }
    );
    worker.on('error', (error) => {
      console.error(`Worker for group ${group} encountered an error:`, error);
    });
    this.queues.set(group, queue);
    this.workers.set(group, worker);
    return queue;
  }
  async addJobToGroup(group, data) {
    const queue = this.queues.get(group) || this.createWorkerForGroup(group);
    const addTime = new Date().toLocaleTimeString();
    const job = await queue.add(group, {
      ...data,
      addedAt: addTime,
    });
    console.log(`Job added to group ${group} at ${addTime}`);
    return job;
  }
  async close() {
    for (const worker of this.workers.values()) {
      await worker.close();
    }
    for (const queue of this.queues.values()) {
      await queue.close();
    }
  }
}

// Example usage
async function runChatMessageExample() {
  const chatMessageWorker = new ChatMessageWorker();
  const chatIds = ['chat1', 'chat2', 'chat3'];

  // Add 5 messages to each chat group
  for (const chatId of chatIds) {
    for (let i = 1; i <= 5; i++) {
      await chatMessageWorker.addJobToGroup(chatId, {
        message: `Message ${i} for ${chatId}`,
      });
    }
  }

  // Optional: Keep the process running to allow workers to process jobs
  await new Promise(resolve => setTimeout(resolve, 30000));

  // Close workers and queues
  await chatMessageWorker.close();
}
runChatMessageExample().catch(console.error);
发布评论

评论列表(0)

  1. 暂无评论