最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Autogen SelectorGroupChat and MagenticOneGroupChat: Queue bound to a different loop when running on different thread -

programmeradmin2浏览0评论

I have a group of agents cooperating under either SelectorGroupChat and MagenticOneGroupChat (it doesn't matter which one, the result is the same). They run on a separate thread processing requests like this:

def run(self):
    def process_loop(loop):
        asyncio.set_event_loop(loop)
        ic('Asking agent initial question', self._initial_request.data)
        response = asyncio.run(self.process_signal(self._initial_request.data))
        while response is not None:
            ic('Agent response - sending', response)
            self._producer.send(response)
            next_request = self._consumer.poll()
            signal = Signal.from_json(next_request)
            ic('Agent next request - received', signal.data)
            try:
                response = asyncio.run(self.process_signal(signal.data))
                ic('Agent next response - sending', response)
            except Exception as e:
                ic(e)
                ic(response)
                pass  # ignore

    loop = asyncio.get_event_loop()
    self._thread = threading.Thread(target=lambda: process_loop(loop))
    self._thread.daemon = True
    ic('Starting receiver')
    self._thread.start()

The call to process_signal calls:

magentic.run_stream(task=signal_data) (or same call for SelectorGroupChat).

This answers the first question without any problem. However, when it processes the second question, I get an exception:

e: RuntimeError('<Queue at 0x11739c830 maxsize=0 tasks=6> is bound to a different event loop')

The thread and the event loop are the same for the first and the second call - why am I seeing this exception?

I have a group of agents cooperating under either SelectorGroupChat and MagenticOneGroupChat (it doesn't matter which one, the result is the same). They run on a separate thread processing requests like this:

def run(self):
    def process_loop(loop):
        asyncio.set_event_loop(loop)
        ic('Asking agent initial question', self._initial_request.data)
        response = asyncio.run(self.process_signal(self._initial_request.data))
        while response is not None:
            ic('Agent response - sending', response)
            self._producer.send(response)
            next_request = self._consumer.poll()
            signal = Signal.from_json(next_request)
            ic('Agent next request - received', signal.data)
            try:
                response = asyncio.run(self.process_signal(signal.data))
                ic('Agent next response - sending', response)
            except Exception as e:
                ic(e)
                ic(response)
                pass  # ignore

    loop = asyncio.get_event_loop()
    self._thread = threading.Thread(target=lambda: process_loop(loop))
    self._thread.daemon = True
    ic('Starting receiver')
    self._thread.start()

The call to process_signal calls:

magentic.run_stream(task=signal_data) (or same call for SelectorGroupChat).

This answers the first question without any problem. However, when it processes the second question, I get an exception:

e: RuntimeError('<Queue at 0x11739c830 maxsize=0 tasks=6> is bound to a different event loop')

The thread and the event loop are the same for the first and the second call - why am I seeing this exception?

Share Improve this question asked Mar 14 at 21:33 Al AAl A 2255 silver badges15 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

It looks like the problem was the two calls to asyncio.run(...). If I wrap the entire process loop with a single asyncio.run(...) call and await for the calls to process_signal then everything works fine.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论