Await queue.get застрял в пустой очереди - PullRequest
0 голосов
/ 07 мая 2020

У меня проблемы с очередями asyncio. Выполнение застревает на await queue.get(), если очередь пуста - даже если я публикую sh что-то в очереди.

У меня есть al oop, который читает очередь событий, которая запускается сразу после загрузки приложения. , поэтому очередь пуста при первом ожидании. В другой совместной подпрограмме я публикую sh сообщение в эту очередь, однако выполнение ожидает оператора ожидания. Очередь читает только один потребитель. Я публикую sh сообщение, используя put_nowait():

    async def _event_loop(self):
    while True:
        try:
            # if self.events.empty():
            #     await asyncio.sleep(0.1)
            #     continue
            ev = await self.events.get()
            print(ev)

Если я раскомментирую закомментированную часть, все начинает работать.

Я заметил похожую проблему здесь: https://github.com/mosquito/aio-pika/issues/56

Но мне не удалось выяснить, как это исправить.

Кто-нибудь знает, что не так?

1 Ответ

0 голосов
/ 07 мая 2020

Вы заполняете очередь из потока, отличного от того, который запускает событие l oop. По дизайну очереди asyncio не потокобезопасны и могут быть безопасно доступны только из сопрограмм asyncio и обратных вызовов.

Вы можете исправить проблему, изменив свой вызов на queue.put_nowait(elem), на что-то например, loop.call_soon_threadsafe(queue.put_nowait, elem), где loop - это объект события l oop, который вы также должны передать потоку, вероятно, так же, как вы передаете очередь.

зачем тогда раскомментированная часть кода исправить проблему?

Раскомментирование эффективно устраняет необходимость пробуждения сопрограммы во время ожидания в пустой очереди. Пробуждение не сработало, потому что put_nowait предполагает, что оно запускается из потока события l oop, и, следовательно, не нужно испускать дополнительный сигнал пробуждения. См., Например, этот ответ для подробностей.

...