I have an async PUB socket as follows:
import asyncio
import zmq
import zmq.asyncio
async def serve(address: str):
context = zmq.asyncio.Context()
socket = context.socket(zmq.PUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.bind(address)
count = 0
while True:
try:
print("sending message %d" % count)
await socket.send(b"message %d" % count)
await asyncio.sleep(1)
count += 1
except asyncio.CancelledError:
break
socket.close()
context.term()
if __name__ == "__main__":
asyncio.run(serve("tcp://*:5555"))
And an async SUB socket as follows:
import asyncio
import zmq
import zmq.asyncio
DELAY = 5
async def consume(address: str):
context = zmq.asyncio.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.setsockopt(zmq.SUBSCRIBE, b"")
socket.connect(address)
while True:
try:
message = await socket.recv()
print(f"received: {message.decode('utf-8')}")
await asyncio.sleep(DELAY) # simulating message processing
except asyncio.CancelledError:
break
socket.close()
context.term()
if __name__ == "__main__":
asyncio.run(consume("tcp://localhost:5555"))
Both with ZMQ_CONFLATE socket option set to 1 to prevent queueing unnecessary messages as I only need the most recent of them.
When this two scripts are running what I expect is the next print of the SUB script to be the most recent print of the PUB script, but actually the SUB script is DELAY
times below PUB script.
For example, considering DELAY = 5
, when the SUB script reaches the socket.recv() coroutine and the most recent PUB script print was sending message 23
I expect the SUB script to print received: message 23
but instead it prints received: message 18
.
If I use a non-asyncio script like follows:
import time
import zmq
import zmq.asyncio
DELAY = 5
def consume(address: str):
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.setsockopt(zmq.SUBSCRIBE, b"")
socket.connect(address)
while True:
try:
message = socket.recv()
print(f"received: {message.decode('utf-8')}")
time.sleep(DELAY)
except KeyboardInterrupt:
break
socket.close()
context.term()
if __name__ == "__main__":
consume("tcp://localhost:5555")
It actually does what I expect.
I would like to have a clearer roadmap to debug my issue, as I need it to be totally asyncio in order to integrate it into another asyncio-applications and prevent the use of threadings.
Am I doing something wrong or the script behaviour is normal and I ignore things that would help me understand it?
Note: The fact that the PUB socket is asynchronous does not affect my issue, I already tested a normal non-async version of the PUB script and it doesn't change a thing.