Python 3.8 эхо-клиент websocket с очередью: asyncio.Queue get () не получает элементы очереди, добавленные на лету - PullRequest
1 голос
/ 26 апреля 2020

Я пытаюсь создать клиента, который использует asyncio.Queue для подачи сообщений, которые я хочу отправить на сервер. Получение данных с сервера websocket прекрасно работает. Отправка данных, которые только что сгенерированы производителем, тоже работает. Для объяснения того, что работает, а что нет, сначала вот мой код:

import sys
import asyncio
import websockets


class WebSocketClient:
    def __init__(self):
        self.send_queue = asyncio.Queue()
        #self.send_queue.put_nowait('test-message-1')

    async def startup(self):
        await self.connect_websocket()
        consumer_task = asyncio.create_task(
            self.consumer_handler()
        )
        producer_task = asyncio.create_task(
            self.producer_handler()
        )
        done, pending = await asyncio.wait(
            [consumer_task, producer_task],
            return_when=asyncio.ALL_COMPLETED
        )
        for task in pending:
            task.cancel()

    async def connect_websocket(self):
        try:
            self.connection = await websockets.client.connect('ws://my-server')
        except ConnectionRefusedError:
            sys.exit('error: cannot connect to backend')

    async def consumer_handler(self):
        async for message in self.connection:
            await self.consumer(message)

    async def consumer(self, message):
        self.send_queue.put_nowait(message)
        # await self.send_queue.put(message)
        print('mirrored message %s now in queue, queue size is %s' % (message, self.send_queue.qsize()))

    async def producer_handler(self):
        while True:
            message = await self.producer()
            await self.connection.send(message)

    async def producer(self):
        result = await self.send_queue.get()
        self.send_queue.task_done()
        #await asyncio.sleep(10)
        #result = 'test-message-2'
        return result


if __name__ == '__main__':
    wsc = WebSocketClient()
    asyncio.run(wsc.startup())

Подключение работает отлично. Если я отправляю что-то со своего сервера клиенту, это тоже прекрасно работает и печатает сообщение в consumer (). Но производитель никогда не получает никакого сообщения, которое я помещаю в send_queue внутри consumer().

. Причина, по которой я выбрал send_queue.put_nowait в consumer(), заключалась в том, что я хотел предотвратить взаимные блокировки. Если я использую строку await self.send_queue.put(message) line вместо self.send_queue.put_nowait(message), это не имеет значения.

Я подумал, что очередь может вообще не работать, поэтому я что-то заполнил в очередь только при создании в __init__(): self.send_queue.put_nowait("test-message-1"). Это работает и отправляется на мой сервер. Таким образом, базовая c концепция очереди и await queue.get() работает.

Я также подумал, может быть, есть какая-то проблема с производителем, поэтому давайте просто случайным образом генерируем сообщения во время выполнения: result = "test-message-2" вместо result = await self.send_queue.get(). Это тоже работает: каждые 10 секунд «тестовое сообщение-2» отправляется на мой сервер.

РЕДАКТИРОВАТЬ: Это также происходит, если я пытаюсь добавить материал из другого источника в очередь на лету. Я создаю небольшой сервер сокетов asyncio, который отправляет любое сообщение в очередь, что прекрасно работает, и вы можете видеть сообщения, которые я добавил из другого источника с qsize() в consumer(), но все еще безуспешно queue.get(). Таким образом, сама очередь, кажется, работает, но не get(). Это, кстати, и причина для очереди: я хотел бы отправить данные из совершенно разных источников.

Итак, это точка, где я застрял. Я предпочитаю, что очередь, которую я использую в producer(), отличается от consumer(), что довольно легко происходит при многопоточности, если вы используете не поточно-безопасные очереди, такие как asyncio.Queue, но, как я понял, я вообще не используйте многопоточность, только сопрограммы. Итак, что же здесь пошло не так?

Только для контекста: это Ubuntu 20.04 python 3.8.2 внутри docker контейнера.

Спасибо, Эрнесто

1 Ответ

0 голосов
/ 02 мая 2020

Только для записей - решение моей проблемы было довольно простым: я определил send_queue вне события l oop, созданного моим клиентом websocket. Поэтому он вызвал events.get_event_loop() и получил свой собственный l oop - который не был частью основного l oop и, следовательно, никогда не вызывался, поэтому await queue.get() действительно никогда ничего не получал.

В обычном режиме , вы не видите ни одного сообщения, которое является подсказкой к этой проблеме. Но python документация для спасения: конечно, они упомянули это в https://docs.python.org/3/library/asyncio-dev.html: logging.DEBUG дали подсказки, которые мне нужны, чтобы найти проблему.

...