I have questions/problems on understanding/using RabbitMQ with MassTransit in a C# application.
We need to consume our data in order and we are facing some optimization problems.
We have a publisher who's getting data in a database (15.000 rows) at each 1,5s.
This publisher send messages using a function that group data in messages and send them on 4 queues. We are grouping data in messages following the rule : (data.Id % numberOfQueues) + 1
. Currently our numberOfQueues equal 4.
Our consumer, in his program.cs file, declare and configure 4 queues on a direct type exchange. To optimize the fluidity between the broker and our consumer we want to set a prefetchCount > 1
. So that the broker delivers multiple messages at once, and we set a concurencyLimit to 1 so that we have our four queues consuming one message at time (no parallelism).
I try to put the prefetchCount
to 1.000. I can see in the RMQ interface that I have 1.000 unacked messages:
I put some logs in my consumer to see if the messages are consumed in order:
On the screen you can see that we are at message id 951 and until now all messages are in the right order. But sometimes we have that:
As you can see on the second screen from the logs of my consumer, the message id 1925 was published at: 40:3:523 and despite this, it has been consumed after the message id 1926 which was published after it.
So it's globally working good (95% good ordered) (estimation) and 5% wrong (= not ordered). But why ? I don't want to have these 5% wrong. How can I handle it? I see that mass transit use a task scheduler to tell the consumer what message to consume form the pack of messages in the ram. If I use native RMQ instead of using mass transit, will this problem be solved?
Moreover, when I stop my process (the micro service which have the consume task), the unacked messages which were in the ram of my consumer due to the pretechCount (1.000) seems to returns into the queue but in a shuffle order. Look what I get when I stop/start my worker:
1/ Messages are ordered with a 5% estimation of non-ordered. I pause the worker with 315 message in the queue and 1000 unacked message:
- queue state before stop
2/ I stop my processes so unacked messages returns inside the queue:
- queue state after stop
3/ I start my publisher/consumer:
- consumer logs after restart
I now have 95% non-ordered. What happened?
With a prefetchCount
of 1.000 and a concurencyLimit
set to 1
, i was expecting that my broker delivers 1.000 at once to the RAM of my consumer and then I expect my consumer to consume message one by one, due to concurencyLimi
t, in the right order.
How can I achieve that?
If I make a batch of 1.000 (equivalent to prefetchCount
), will my messages be consumed in the right order?