I have a group of agents cooperating under either SelectorGroupChat and MagenticOneGroupChat (it doesn't matter which one, the result is the same). They run on a separate thread processing requests like this:
def run(self):
def process_loop(loop):
asyncio.set_event_loop(loop)
ic('Asking agent initial question', self._initial_request.data)
response = asyncio.run(self.process_signal(self._initial_request.data))
while response is not None:
ic('Agent response - sending', response)
self._producer.send(response)
next_request = self._consumer.poll()
signal = Signal.from_json(next_request)
ic('Agent next request - received', signal.data)
try:
response = asyncio.run(self.process_signal(signal.data))
ic('Agent next response - sending', response)
except Exception as e:
ic(e)
ic(response)
pass # ignore
loop = asyncio.get_event_loop()
self._thread = threading.Thread(target=lambda: process_loop(loop))
self._thread.daemon = True
ic('Starting receiver')
self._thread.start()
The call to process_signal
calls:
magentic.run_stream(task=signal_data)
(or same call for SelectorGroupChat).
This answers the first question without any problem. However, when it processes the second question, I get an exception:
e: RuntimeError('<Queue at 0x11739c830 maxsize=0 tasks=6> is bound to a different event loop')
The thread and the event loop are the same for the first and the second call - why am I seeing this exception?
I have a group of agents cooperating under either SelectorGroupChat and MagenticOneGroupChat (it doesn't matter which one, the result is the same). They run on a separate thread processing requests like this:
def run(self):
def process_loop(loop):
asyncio.set_event_loop(loop)
ic('Asking agent initial question', self._initial_request.data)
response = asyncio.run(self.process_signal(self._initial_request.data))
while response is not None:
ic('Agent response - sending', response)
self._producer.send(response)
next_request = self._consumer.poll()
signal = Signal.from_json(next_request)
ic('Agent next request - received', signal.data)
try:
response = asyncio.run(self.process_signal(signal.data))
ic('Agent next response - sending', response)
except Exception as e:
ic(e)
ic(response)
pass # ignore
loop = asyncio.get_event_loop()
self._thread = threading.Thread(target=lambda: process_loop(loop))
self._thread.daemon = True
ic('Starting receiver')
self._thread.start()
The call to process_signal
calls:
magentic.run_stream(task=signal_data)
(or same call for SelectorGroupChat).
This answers the first question without any problem. However, when it processes the second question, I get an exception:
e: RuntimeError('<Queue at 0x11739c830 maxsize=0 tasks=6> is bound to a different event loop')
The thread and the event loop are the same for the first and the second call - why am I seeing this exception?
Share Improve this question asked Mar 14 at 21:33 Al AAl A 2255 silver badges15 bronze badges1 Answer
Reset to default 0It looks like the problem was the two calls to asyncio.run(...)
. If I wrap the entire process loop with a single asyncio.run(...)
call and await
for the calls to process_signal
then everything works fine.