最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Using ZMQ, which pattern and framework to use in such a scenario ? Two computers, bidirectional, not knowing which star

programmeradmin6浏览0评论

My application is separated in two computers (A which contains some GUI and B which contains some complex processing). Here are some constraints:

PROBLEMS

  1. Both computers need to communicate between each other (bidirectional communication) at any given rate, asynchronously (meaning that A could send multiple messages in a row to B, and vice-versa).
  2. The listening + processing of messages should not be blocking.
  3. Any computer could crash or be closed, so it shouldn't be a problem for the other one, but instead reconnect when the closed one is alive once more.
  4. [optional] We don't know which process starts first, so the connection to one or the other can be done in any order, which would mean that there is no real server and client.

IDEAS

To resolve these issues, I thought of doing:

  1. Using two different PUB-SUB sockets per computer, to separate the handling of listener and sender threads. I thought about other patterns (PUSH-DEALER, etc) but I feel like the simplest PUB-SUB is the best one for this configuration?
  2. Simple multithreading, but from this guide it seems like I shouldn't multithread any given socket, so I was wondering how to make it clean.
  3. Using heartbeats + liveness to constantly check if any process is alive or not, and closing + restarting context and sockets if it finds that the other computer is not responding anymore.
  4. I am not so sure about this part, as:

Say you have two nodes doing this, in this order:

  • Subscriber connects to an endpoint and receives and counts messages.

  • Publisher binds to an endpoint and immediately sends 1,000 messages.

Then the subscriber will most likely not receive anything. You’ll blink, check that you set a correct filter and try again, and the subscriber will still not receive anything.

Which means that the order of connection actually matters... Especially after a crash of A or B.

CODE

I made a first draft that kinda works when taking into consideration 1. and 2., but as you'll see it is a mess and I am not sure how to scale it and make it better before trying to even use heartbeats...

COMPUTER A

class A:
  def __init__(self, A_to_B_addr, B_to_A_addr):
    self.A_to_B_addr = A_to_B_addr
    self.B_to_A_addr = B_to_A_addr
    self.A_to_B_socket = None
    self.B_to_A_socket = None
    self.poller = None
    self.context = None

    self.setup_zmq_pub()


  def setup_zmq_pub(self):
    # Create a ZMQ context
    self.context = zmq.Context()
    # Set up a publisher socket to send output messages
    self.A_to_B_socket = self.context.socket(zmq.PUB)
    self.A_to_B_socket.bind(self.A_to_B_addr)

    # Setup poller to handle socket events
    self.poller = zmq.Poller()
    self.poller.register(self.A_to_B_socket, zmq.POLLOUT)

    # Wait for a short time to make sure it is set up
    time.sleep(1)

    self.send_to_B("A ready")
    self.setup_zmq_sub()

  def setup_zmq_sub(self):
    # Set up a subscriber socket to listen for incoming messages
    self.B_to_A_socket = self.context.socket(zmq.SUB)
    self.B_to_A_socket.connect(self.B_to_A_addr)

    # Subscribe to all messages
    self.B_to_A_socket.setsockopt_string(zmq.SUBSCRIBE, "")

    self.poller.register(self.B_to_A_socket, zmq.POLLIN)

    print(f"Listening on {self.B_to_A_addr} and sending outputs to {self.A_to_B_addr}")

    t = threading.Thread(target=self.zmq_listener).start()

  def zmq_listener(self):
    """Continuously listens to B, to handle requests"""
    while True:
        socks = dict(self.poller.poll())
        # Check if we can read from the B_to_A_socket
        if self.B_to_A_socket in socks and socks[self.B_to_A_socket] == zmq.POLLIN:
            # Receive a message
            message = self.B_to_A_socket.recv_string()

            # Deserialize the message from JSON format
            try:
                decoded_message = json.loads(message)
                print(f"A Received message as JSON: {decoded_message}")

                # Decodes the message and handles it
                self.on_A_request(decoded_message)
            except json.JSONDecodeError:
                print("Error: Received message is not a valid JSON.")

  def send_to_B(self, message):
    """Sends a zmq message to B computer"""
    # Retries to send the message until it is received by B
    while True:
        socks = dict(self.poller.poll(timeout=1))
        # Check if we can send on the dolci_to_B_socket
        if self.A_to_B_socket in socks and socks[self.A_to_B_socket] == zmq.POLLOUT:
            # Prepare the JSON object to send
            json_message = json.dumps({"message": message})
            print(f"Sending to B: {message}")

            # Send the JSON object
            self.A_to_B_socket.send_string(json_message)
            break
        else:
            print(f"Message not sent: {message}")
            time.sleep(1)

COMPUTER B Very similar to A, with the connections being opposites as A, a slightly different connection and a "ready" message at startup:

class B:
  def __init__(self, B_to_A_addr, A_to_B_addr):
    self.A_to_B_addr = A_to_B_addr
    self.B_to_A_addr = B_to_A_addr
    self.A_to_B_socket = None
    self.B_to_A_socket = None
    self.poller = None
    self.context = None

    self.A_ready = False

    self.setup_zmq_pub()
    self.send_to_A("B ready")


  def setup_zmq(self):
    # Create a ZMQ context
    self.context = zmq.Context()

    # Set up a subscriber socket to listen for incoming messages
    self.A_to_B_socket = self.context.socket(zmq.SUB)
    self.A_to_B_socket.connect(self.A_to_B_addr)

    # Subscribe to all messages (empty string means receive all)
    self.A_to_B_socket.setsockopt_string(zmq.SUBSCRIBE, "")

    # Set up a publisher socket to send output messages
    self.B_to_A_socket = self.context.socket(zmq.PUB)
    self.B_to_A_socket.bind(self.B_to_A_addr)

    # Set up the poller
    self.poller = zmq.Poller()
    self.poller.register(self.A_to_B_socket, zmq.POLLIN)
    self.poller.register(self.B_to_A_socket, zmq.POLLOUT)

    print(f"Listening on {self.A_to_B_addr} and sending outputs to {self.B_to_A_addr}")

    t = threading.Thread(target=self.zmq_listener).start()

  def on_B_request(self, request):
    """Handles a request from Dolci"""

    # Removes some unexpected behavior
    request = request['message']
    if "A ready" in request:
        self.A_ready = True
    [...]

  def send_to_A(self, message):
    """Sends a zmq message to A computer"""
    while self.set_zmq:
        socks = dict(self.poller.poll())
        if self.dolci_ready and self.B_to_A_socket in socks and socks[self.B_to_A_socket] == zmq.POLLOUT:
        [...]

  

The rest is similar.

So in this context, we need to start B first, wait 1-2 seconds (to make sure that setup_zmq() is done and then start A.

As you can see, the code is surely way more complicated than what it needs to be, too tedious (using time.sleep() is surely a very wrong idea, polling is probably not necessary, but didn't manage to make it work without), and makes it difficultly scalable for points 3. and 4.

Do you have any inputs on what could be better here, and how to resolve the other issues discussed above?

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论