Я пытаюсь создать клиента, который использует asyncio.Queue для подачи сообщений, которые я хочу отправить на сервер. Получение данных с сервера websocket прекрасно работает. Отправка данных, которые только что сгенерированы производителем, тоже работает. Для объяснения того, что работает, а что нет, сначала вот мой код:
import sys
import asyncio
import websockets
class WebSocketClient:
def __init__(self):
self.send_queue = asyncio.Queue()
#self.send_queue.put_nowait('test-message-1')
async def startup(self):
await self.connect_websocket()
consumer_task = asyncio.create_task(
self.consumer_handler()
)
producer_task = asyncio.create_task(
self.producer_handler()
)
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.ALL_COMPLETED
)
for task in pending:
task.cancel()
async def connect_websocket(self):
try:
self.connection = await websockets.client.connect('ws://my-server')
except ConnectionRefusedError:
sys.exit('error: cannot connect to backend')
async def consumer_handler(self):
async for message in self.connection:
await self.consumer(message)
async def consumer(self, message):
self.send_queue.put_nowait(message)
# await self.send_queue.put(message)
print('mirrored message %s now in queue, queue size is %s' % (message, self.send_queue.qsize()))
async def producer_handler(self):
while True:
message = await self.producer()
await self.connection.send(message)
async def producer(self):
result = await self.send_queue.get()
self.send_queue.task_done()
#await asyncio.sleep(10)
#result = 'test-message-2'
return result
if __name__ == '__main__':
wsc = WebSocketClient()
asyncio.run(wsc.startup())
Подключение работает отлично. Если я отправляю что-то со своего сервера клиенту, это тоже прекрасно работает и печатает сообщение в consumer (). Но производитель никогда не получает никакого сообщения, которое я помещаю в send_queue внутри consumer()
.
. Причина, по которой я выбрал send_queue.put_nowait в consumer()
, заключалась в том, что я хотел предотвратить взаимные блокировки. Если я использую строку await self.send_queue.put(message)
line вместо self.send_queue.put_nowait(message)
, это не имеет значения.
Я подумал, что очередь может вообще не работать, поэтому я что-то заполнил в очередь только при создании в __init__()
: self.send_queue.put_nowait("test-message-1")
. Это работает и отправляется на мой сервер. Таким образом, базовая c концепция очереди и await queue.get()
работает.
Я также подумал, может быть, есть какая-то проблема с производителем, поэтому давайте просто случайным образом генерируем сообщения во время выполнения: result = "test-message-2"
вместо result = await self.send_queue.get()
. Это тоже работает: каждые 10 секунд «тестовое сообщение-2» отправляется на мой сервер.
РЕДАКТИРОВАТЬ: Это также происходит, если я пытаюсь добавить материал из другого источника в очередь на лету. Я создаю небольшой сервер сокетов asyncio, который отправляет любое сообщение в очередь, что прекрасно работает, и вы можете видеть сообщения, которые я добавил из другого источника с qsize()
в consumer()
, но все еще безуспешно queue.get()
. Таким образом, сама очередь, кажется, работает, но не get()
. Это, кстати, и причина для очереди: я хотел бы отправить данные из совершенно разных источников.
Итак, это точка, где я застрял. Я предпочитаю, что очередь, которую я использую в producer()
, отличается от consumer()
, что довольно легко происходит при многопоточности, если вы используете не поточно-безопасные очереди, такие как asyncio.Queue, но, как я понял, я вообще не используйте многопоточность, только сопрограммы. Итак, что же здесь пошло не так?
Только для контекста: это Ubuntu 20.04 python 3.8.2 внутри docker контейнера.
Спасибо, Эрнесто