I'm pretty new to Node.js and ES6, and this is just confusing me a little bit. I am trying to leave a process running, consuming messages from a RabbitMQ queue. It needs to be able to process the message (which takes about 30-60 seconds) before it grabs the next message. Currently, the code I have, it grabs all messages it can and then tries to fork the processes. When there are 3-5 messages in the queue, this is fine, but for 20, 50 or 100 messages, this causes the server to run out of memory.
I have tried making the .consume()
callback function async and adding await
to the message processing function. I have tried wrapping an await new Promise
within the .consume()
callback around processMessage
. I have tried adding await
to the line that calls channel.consume
. Nothing changes the behavior.
#!/usr/bin/env node
const amqp = require('amqplib');
const consumeFromQueue = async (queue, isNoAck = false, durable = false, prefetch = null) => {
const conn_str = "amqp://" + process.env.RABBITMQ_USERNAME + ":" + process.env.RABBITMQ_PASSWORD + "@" + process.env.RABBITMQ_HOST + "/development?heartbeat=60"
const cluster = await amqp.connect(conn_str);
const channel = await cluster.createChannel();
await channel.assertQueue(queue, { durable: durable, autoDelete: true });
if (prefetch) {
channel.prefetch(prefetch);
}
console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)
try {
channel.consume(queue, message => {
if (message !== null) {
console.log(' [x] Received', message.content.toString());
processMessage(message.content.toString());
channel.ack(message);
return null;
} else {
console.log(error, 'Queue is empty!')
channel.reject(message);
}
}, {noAck: isNoAck});
} catch (error) {
console.log(error, 'Failed to consume messages from Queue!')
cluster.close();
}
}
exports.consumeFromQueue = consumeFromQueue;
As a sidenote, if I create an array of strings and loop through the strings, when I add await to the processMessage
line, it waits to execute process (30-60 seconds) before processing the next string.
(async () => {
for (let i=0; i<urls.length; i++) {
await processMessage(urls[i]);
}
})();
So I basically need something that functions like this, but with listening to the queue in RabbitMQ.
I'm pretty new to Node.js and ES6, and this is just confusing me a little bit. I am trying to leave a process running, consuming messages from a RabbitMQ queue. It needs to be able to process the message (which takes about 30-60 seconds) before it grabs the next message. Currently, the code I have, it grabs all messages it can and then tries to fork the processes. When there are 3-5 messages in the queue, this is fine, but for 20, 50 or 100 messages, this causes the server to run out of memory.
I have tried making the .consume()
callback function async and adding await
to the message processing function. I have tried wrapping an await new Promise
within the .consume()
callback around processMessage
. I have tried adding await
to the line that calls channel.consume
. Nothing changes the behavior.
#!/usr/bin/env node
const amqp = require('amqplib');
const consumeFromQueue = async (queue, isNoAck = false, durable = false, prefetch = null) => {
const conn_str = "amqp://" + process.env.RABBITMQ_USERNAME + ":" + process.env.RABBITMQ_PASSWORD + "@" + process.env.RABBITMQ_HOST + "/development?heartbeat=60"
const cluster = await amqp.connect(conn_str);
const channel = await cluster.createChannel();
await channel.assertQueue(queue, { durable: durable, autoDelete: true });
if (prefetch) {
channel.prefetch(prefetch);
}
console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)
try {
channel.consume(queue, message => {
if (message !== null) {
console.log(' [x] Received', message.content.toString());
processMessage(message.content.toString());
channel.ack(message);
return null;
} else {
console.log(error, 'Queue is empty!')
channel.reject(message);
}
}, {noAck: isNoAck});
} catch (error) {
console.log(error, 'Failed to consume messages from Queue!')
cluster.close();
}
}
exports.consumeFromQueue = consumeFromQueue;
As a sidenote, if I create an array of strings and loop through the strings, when I add await to the processMessage
line, it waits to execute process (30-60 seconds) before processing the next string.
(async () => {
for (let i=0; i<urls.length; i++) {
await processMessage(urls[i]);
}
})();
So I basically need something that functions like this, but with listening to the queue in RabbitMQ.
Share Improve this question asked Mar 9, 2020 at 19:16 Kenyon RosewallKenyon Rosewall 731 silver badge3 bronze badges 4-
What value of
prefetch
do you pass into yourconsumeFromQueue
function? – shkaper Commented Mar 9, 2020 at 19:23 - @shkaper Just null for now. – Kenyon Rosewall Commented Mar 9, 2020 at 19:25
- Prefetch count is what limits the number of messages that can be processed at a time by a consumer. Try limiting it to 3-5. – shkaper Commented Mar 9, 2020 at 19:35
- If I set prefetch, it doesn't seem that I can get any more messages until I restart my node script. Also, if I process 5 messages at once, I am unable to verify whether any are duplicates, and so could run into issues there. I am just wondering if there is anyway to wait for the process to finish before it gets a new message. – Kenyon Rosewall Commented Mar 9, 2020 at 19:46
3 Answers
Reset to default 11If you want to limit the number of messages being processed by a consumer at any given time, use channel.prefetch():
The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement; once there are count messages outstanding, the server will not send more messages on this channel until one or more have been acknowledged.
That is, if you only want to be able process a single message at a time before moving on to the next, set channel.prefetch(1)
If someone needs a plete answer:
You need to mix channel.prefetch(1), and { noAck: false } to consume one to one messages:
Simple Example:
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
await channel.assertQueue(queue, { durable: false })
// Set the number of messages to consume:
channel.prefetch(1)
await channel.consume(
'QUEUE_NAME',
async message => {
if (message) {
// YOUR ASYNC/AWAIT CODE
// And then, ack the message manually:
channel.ack(message)
}
},
{ noAck: false } // Set noAck to false to manually acknowledge messages
)
This is the way to consume a message at a time.
moving on to the next, set channel.prefetch(1)
channel.prefetch(1)