te')); return $arr; } /* 遍历用户所有主题 * @param $uid 用户ID * @param int $page 页数 * @param int $pagesize 每页记录条数 * @param bool $desc 排序方式 TRUE降序 FALSE升序 * @param string $key 返回的数组用那一列的值作为 key * @param array $col 查询哪些列 */ function thread_tid_find_by_uid($uid, $page = 1, $pagesize = 1000, $desc = TRUE, $key = 'tid', $col = array()) { if (empty($uid)) return array(); $orderby = TRUE == $desc ? -1 : 1; $arr = thread_tid__find($cond = array('uid' => $uid), array('tid' => $orderby), $page, $pagesize, $key, $col); return $arr; } // 遍历栏目下tid 支持数组 $fid = array(1,2,3) function thread_tid_find_by_fid($fid, $page = 1, $pagesize = 1000, $desc = TRUE) { if (empty($fid)) return array(); $orderby = TRUE == $desc ? -1 : 1; $arr = thread_tid__find($cond = array('fid' => $fid), array('tid' => $orderby), $page, $pagesize, 'tid', array('tid', 'verify_date')); return $arr; } function thread_tid_delete($tid) { if (empty($tid)) return FALSE; $r = thread_tid__delete(array('tid' => $tid)); return $r; } function thread_tid_count() { $n = thread_tid__count(); return $n; } // 统计用户主题数 大数量下严谨使用非主键统计 function thread_uid_count($uid) { $n = thread_tid__count(array('uid' => $uid)); return $n; } // 统计栏目主题数 大数量下严谨使用非主键统计 function thread_fid_count($fid) { $n = thread_tid__count(array('fid' => $fid)); return $n; } ?>javascript - Can I wait for a process to complete when consuming RabbitMQ messages with Node.js? - Stack Overflow
最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

javascript - Can I wait for a process to complete when consuming RabbitMQ messages with Node.js? - Stack Overflow

programmeradmin4浏览0评论

I'm pretty new to Node.js and ES6, and this is just confusing me a little bit. I am trying to leave a process running, consuming messages from a RabbitMQ queue. It needs to be able to process the message (which takes about 30-60 seconds) before it grabs the next message. Currently, the code I have, it grabs all messages it can and then tries to fork the processes. When there are 3-5 messages in the queue, this is fine, but for 20, 50 or 100 messages, this causes the server to run out of memory.

I have tried making the .consume() callback function async and adding await to the message processing function. I have tried wrapping an await new Promise within the .consume() callback around processMessage. I have tried adding await to the line that calls channel.consume. Nothing changes the behavior.

#!/usr/bin/env node

const amqp = require('amqplib');

const consumeFromQueue = async (queue, isNoAck = false, durable = false, prefetch = null) => {
    const conn_str = "amqp://" + process.env.RABBITMQ_USERNAME + ":" + process.env.RABBITMQ_PASSWORD + "@" + process.env.RABBITMQ_HOST + "/development?heartbeat=60"
    const cluster = await amqp.connect(conn_str);
    const channel = await cluster.createChannel();
    await channel.assertQueue(queue,  { durable: durable, autoDelete: true });
    if (prefetch) {
        channel.prefetch(prefetch);
    }
    console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)

    try {
        channel.consume(queue, message => {
            if (message !== null) {
                console.log(' [x] Received', message.content.toString());
                processMessage(message.content.toString());
                channel.ack(message);
                return null;
            } else {
                console.log(error, 'Queue is empty!')
                channel.reject(message);
            }
        }, {noAck: isNoAck});
    } catch (error) {
        console.log(error, 'Failed to consume messages from Queue!')
        cluster.close(); 
    }
}

exports.consumeFromQueue = consumeFromQueue;

As a sidenote, if I create an array of strings and loop through the strings, when I add await to the processMessage line, it waits to execute process (30-60 seconds) before processing the next string.

(async () => {
    for (let i=0; i<urls.length; i++) {
        await processMessage(urls[i]);
    }
})();

So I basically need something that functions like this, but with listening to the queue in RabbitMQ.

I'm pretty new to Node.js and ES6, and this is just confusing me a little bit. I am trying to leave a process running, consuming messages from a RabbitMQ queue. It needs to be able to process the message (which takes about 30-60 seconds) before it grabs the next message. Currently, the code I have, it grabs all messages it can and then tries to fork the processes. When there are 3-5 messages in the queue, this is fine, but for 20, 50 or 100 messages, this causes the server to run out of memory.

I have tried making the .consume() callback function async and adding await to the message processing function. I have tried wrapping an await new Promise within the .consume() callback around processMessage. I have tried adding await to the line that calls channel.consume. Nothing changes the behavior.

#!/usr/bin/env node

const amqp = require('amqplib');

const consumeFromQueue = async (queue, isNoAck = false, durable = false, prefetch = null) => {
    const conn_str = "amqp://" + process.env.RABBITMQ_USERNAME + ":" + process.env.RABBITMQ_PASSWORD + "@" + process.env.RABBITMQ_HOST + "/development?heartbeat=60"
    const cluster = await amqp.connect(conn_str);
    const channel = await cluster.createChannel();
    await channel.assertQueue(queue,  { durable: durable, autoDelete: true });
    if (prefetch) {
        channel.prefetch(prefetch);
    }
    console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)

    try {
        channel.consume(queue, message => {
            if (message !== null) {
                console.log(' [x] Received', message.content.toString());
                processMessage(message.content.toString());
                channel.ack(message);
                return null;
            } else {
                console.log(error, 'Queue is empty!')
                channel.reject(message);
            }
        }, {noAck: isNoAck});
    } catch (error) {
        console.log(error, 'Failed to consume messages from Queue!')
        cluster.close(); 
    }
}

exports.consumeFromQueue = consumeFromQueue;

As a sidenote, if I create an array of strings and loop through the strings, when I add await to the processMessage line, it waits to execute process (30-60 seconds) before processing the next string.

(async () => {
    for (let i=0; i<urls.length; i++) {
        await processMessage(urls[i]);
    }
})();

So I basically need something that functions like this, but with listening to the queue in RabbitMQ.

Share Improve this question asked Mar 9, 2020 at 19:16 Kenyon RosewallKenyon Rosewall 731 silver badge3 bronze badges 4
  • What value of prefetch do you pass into your consumeFromQueue function? – shkaper Commented Mar 9, 2020 at 19:23
  • @shkaper Just null for now. – Kenyon Rosewall Commented Mar 9, 2020 at 19:25
  • Prefetch count is what limits the number of messages that can be processed at a time by a consumer. Try limiting it to 3-5. – shkaper Commented Mar 9, 2020 at 19:35
  • If I set prefetch, it doesn't seem that I can get any more messages until I restart my node script. Also, if I process 5 messages at once, I am unable to verify whether any are duplicates, and so could run into issues there. I am just wondering if there is anyway to wait for the process to finish before it gets a new message. – Kenyon Rosewall Commented Mar 9, 2020 at 19:46
Add a ment  | 

3 Answers 3

Reset to default 11

If you want to limit the number of messages being processed by a consumer at any given time, use channel.prefetch():

The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement; once there are count messages outstanding, the server will not send more messages on this channel until one or more have been acknowledged.

That is, if you only want to be able process a single message at a time before moving on to the next, set channel.prefetch(1)

If someone needs a plete answer:

You need to mix channel.prefetch(1), and { noAck: false } to consume one to one messages:

Simple Example:

const connection = await amqp.connect('amqp://localhost')    
const channel = await connection.createChannel()

await channel.assertQueue(queue, { durable: false })

// Set the number of messages to consume:
channel.prefetch(1)

await channel.consume(
      'QUEUE_NAME',
      async message => {
         if (message) {
            // YOUR ASYNC/AWAIT CODE

            // And then, ack the message manually:
            channel.ack(message)
         }
      },

      { noAck: false } // Set noAck to false to manually acknowledge messages
)

This is the way to consume a message at a time.

moving on to the next, set channel.prefetch(1)

channel.prefetch(1)
发布评论

评论列表(0)

  1. 暂无评论