最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - PyZMQ asyncio SUB socket not receiving last message even with CONFLATE set to 1 - Stack Overflow

programmeradmin1浏览0评论

I have an async PUB socket as follows:

import asyncio

import zmq
import zmq.asyncio

async def serve(address: str):
    context = zmq.asyncio.Context()
    socket = context.socket(zmq.PUB)
    socket.setsockopt(zmq.CONFLATE, 1)
    socket.bind(address)
    count = 0
    while True:
        try:
            print("sending message %d" % count)
            await socket.send(b"message %d" % count)
            await asyncio.sleep(1)
            count += 1
        except asyncio.CancelledError:
            break
    socket.close()
    context.term()

if __name__ == "__main__":
    asyncio.run(serve("tcp://*:5555"))

And an async SUB socket as follows:

import asyncio
import zmq
import zmq.asyncio

DELAY = 5

async def consume(address: str):
    context = zmq.asyncio.Context()
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.CONFLATE, 1)
    socket.setsockopt(zmq.SUBSCRIBE, b"")
    socket.connect(address)
    while True:
        try:
            message = await socket.recv()
            print(f"received: {message.decode('utf-8')}")
            await asyncio.sleep(DELAY)  # simulating message processing
        except asyncio.CancelledError:
            break
    socket.close()
    context.term()

if __name__ == "__main__":
    asyncio.run(consume("tcp://localhost:5555"))

Both with ZMQ_CONFLATE socket option set to 1 to prevent queueing unnecessary messages as I only need the most recent of them.

When this two scripts are running what I expect is the next print of the SUB script to be the most recent print of the PUB script, but actually the SUB script is DELAY times below PUB script.

For example, considering DELAY = 5, when the SUB script reaches the socket.recv() coroutine and the most recent PUB script print was sending message 23 I expect the SUB script to print received: message 23 but instead it prints received: message 18.

If I use a non-asyncio script like follows:

import time
import zmq
import zmq.asyncio

DELAY = 5

def consume(address: str):
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.CONFLATE, 1)
    socket.setsockopt(zmq.SUBSCRIBE, b"")
    socket.connect(address)
    while True:
        try:
            message = socket.recv()
            print(f"received: {message.decode('utf-8')}")
            time.sleep(DELAY)
        except KeyboardInterrupt:
            break
    socket.close()
    context.term()

if __name__ == "__main__":
    consume("tcp://localhost:5555")

It actually does what I expect.

I would like to have a clearer roadmap to debug my issue, as I need it to be totally asyncio in order to integrate it into another asyncio-applications and prevent the use of threadings.

Am I doing something wrong or the script behaviour is normal and I ignore things that would help me understand it?

Note: The fact that the PUB socket is asynchronous does not affect my issue, I already tested a normal non-async version of the PUB script and it doesn't change a thing.

发布评论

评论列表(0)

  1. 暂无评论