My application is separated in two computers (A which contains some GUI and B which contains some complex processing). Here are some constraints:
PROBLEMS
- Both computers need to communicate between each other (bidirectional communication) at any given rate, asynchronously (meaning that A could send multiple messages in a row to B, and vice-versa).
- The listening + processing of messages should not be blocking.
- Any computer could crash or be closed, so it shouldn't be a problem for the other one, but instead reconnect when the closed one is alive once more.
- [optional] We don't know which process starts first, so the connection to one or the other can be done in any order, which would mean that there is no real server and client.
IDEAS
To resolve these issues, I thought of doing:
- Using two different PUB-SUB sockets per computer, to separate the handling of listener and sender threads. I thought about other patterns (PUSH-DEALER, etc) but I feel like the simplest PUB-SUB is the best one for this configuration?
- Simple multithreading, but from this guide it seems like I shouldn't multithread any given socket, so I was wondering how to make it clean.
- Using heartbeats + liveness to constantly check if any process is alive or not, and closing + restarting context and sockets if it finds that the other computer is not responding anymore.
- I am not so sure about this part, as:
Say you have two nodes doing this, in this order:
Subscriber connects to an endpoint and receives and counts messages.
Publisher binds to an endpoint and immediately sends 1,000 messages.
Then the subscriber will most likely not receive anything. You’ll blink, check that you set a correct filter and try again, and the subscriber will still not receive anything.
Which means that the order of connection actually matters... Especially after a crash of A or B.
CODE
I made a first draft that kinda works when taking into consideration 1. and 2., but as you'll see it is a mess and I am not sure how to scale it and make it better before trying to even use heartbeats...
COMPUTER A
class A:
def __init__(self, A_to_B_addr, B_to_A_addr):
self.A_to_B_addr = A_to_B_addr
self.B_to_A_addr = B_to_A_addr
self.A_to_B_socket = None
self.B_to_A_socket = None
self.poller = None
self.context = None
self.setup_zmq_pub()
def setup_zmq_pub(self):
# Create a ZMQ context
self.context = zmq.Context()
# Set up a publisher socket to send output messages
self.A_to_B_socket = self.context.socket(zmq.PUB)
self.A_to_B_socket.bind(self.A_to_B_addr)
# Setup poller to handle socket events
self.poller = zmq.Poller()
self.poller.register(self.A_to_B_socket, zmq.POLLOUT)
# Wait for a short time to make sure it is set up
time.sleep(1)
self.send_to_B("A ready")
self.setup_zmq_sub()
def setup_zmq_sub(self):
# Set up a subscriber socket to listen for incoming messages
self.B_to_A_socket = self.context.socket(zmq.SUB)
self.B_to_A_socket.connect(self.B_to_A_addr)
# Subscribe to all messages
self.B_to_A_socket.setsockopt_string(zmq.SUBSCRIBE, "")
self.poller.register(self.B_to_A_socket, zmq.POLLIN)
print(f"Listening on {self.B_to_A_addr} and sending outputs to {self.A_to_B_addr}")
t = threading.Thread(target=self.zmq_listener).start()
def zmq_listener(self):
"""Continuously listens to B, to handle requests"""
while True:
socks = dict(self.poller.poll())
# Check if we can read from the B_to_A_socket
if self.B_to_A_socket in socks and socks[self.B_to_A_socket] == zmq.POLLIN:
# Receive a message
message = self.B_to_A_socket.recv_string()
# Deserialize the message from JSON format
try:
decoded_message = json.loads(message)
print(f"A Received message as JSON: {decoded_message}")
# Decodes the message and handles it
self.on_A_request(decoded_message)
except json.JSONDecodeError:
print("Error: Received message is not a valid JSON.")
def send_to_B(self, message):
"""Sends a zmq message to B computer"""
# Retries to send the message until it is received by B
while True:
socks = dict(self.poller.poll(timeout=1))
# Check if we can send on the dolci_to_B_socket
if self.A_to_B_socket in socks and socks[self.A_to_B_socket] == zmq.POLLOUT:
# Prepare the JSON object to send
json_message = json.dumps({"message": message})
print(f"Sending to B: {message}")
# Send the JSON object
self.A_to_B_socket.send_string(json_message)
break
else:
print(f"Message not sent: {message}")
time.sleep(1)
COMPUTER B Very similar to A, with the connections being opposites as A, a slightly different connection and a "ready" message at startup:
class B:
def __init__(self, B_to_A_addr, A_to_B_addr):
self.A_to_B_addr = A_to_B_addr
self.B_to_A_addr = B_to_A_addr
self.A_to_B_socket = None
self.B_to_A_socket = None
self.poller = None
self.context = None
self.A_ready = False
self.setup_zmq_pub()
self.send_to_A("B ready")
def setup_zmq(self):
# Create a ZMQ context
self.context = zmq.Context()
# Set up a subscriber socket to listen for incoming messages
self.A_to_B_socket = self.context.socket(zmq.SUB)
self.A_to_B_socket.connect(self.A_to_B_addr)
# Subscribe to all messages (empty string means receive all)
self.A_to_B_socket.setsockopt_string(zmq.SUBSCRIBE, "")
# Set up a publisher socket to send output messages
self.B_to_A_socket = self.context.socket(zmq.PUB)
self.B_to_A_socket.bind(self.B_to_A_addr)
# Set up the poller
self.poller = zmq.Poller()
self.poller.register(self.A_to_B_socket, zmq.POLLIN)
self.poller.register(self.B_to_A_socket, zmq.POLLOUT)
print(f"Listening on {self.A_to_B_addr} and sending outputs to {self.B_to_A_addr}")
t = threading.Thread(target=self.zmq_listener).start()
def on_B_request(self, request):
"""Handles a request from Dolci"""
# Removes some unexpected behavior
request = request['message']
if "A ready" in request:
self.A_ready = True
[...]
def send_to_A(self, message):
"""Sends a zmq message to A computer"""
while self.set_zmq:
socks = dict(self.poller.poll())
if self.dolci_ready and self.B_to_A_socket in socks and socks[self.B_to_A_socket] == zmq.POLLOUT:
[...]
The rest is similar.
So in this context, we need to start B first, wait 1-2 seconds (to make sure that setup_zmq()
is done and then start A.
As you can see, the code is surely way more complicated than what it needs to be, too tedious (using time.sleep()
is surely a very wrong idea, polling
is probably not necessary, but didn't manage to make it work without), and makes it difficultly scalable for points 3. and 4.
Do you have any inputs on what could be better here, and how to resolve the other issues discussed above?