Независимо потребляющие сообщения websocket (Python) - PullRequest
1 голос
/ 05 апреля 2020

Есть ли способ дважды получать сообщения из соединения через веб-сокет, используя два отдельных цикла async for?

Запуск приведенного ниже кода дает RuntimeError: невозможно вызвать recv, пока другая сопрограмма уже ожидает следующее сообщение .

import websockets
import asyncio

async def foo(ws):
    async for msg in ws:
        print(f"foo: {msg}")


async def bar(ws):
    async for msg in ws:
        print(f"bar: {msg}")


async def main():
    async with websockets.connect("wss://echo.websocket.org") as ws:
        asyncio.create_task(foo(ws))
        asyncio.create_task(bar(ws))
        await ws.send("Hello")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

1 Ответ

1 голос
/ 06 апреля 2020

Вы можете создать функцию широковещания, которая считывает ws один раз и передает каждое сообщение нескольким генераторам. Например (не проверено):

def broadcast(stream, num):
    # iterate over the stream just once, but put each message into
    # multiple queues
    queues = []
    async def consume():
        async for msg in stream:
            for queue in queues:
                await queue.put(msg)
        for queue in queues:
            await queue.put(None)
    asyncio.create_task(consume())

    # create the queues and return the generators that transmit
    # their contents
    async def transmit(queue):
        while True:
            msg = await queue.get()
            if msg is None:
                break
            yield msg

    iters = []
    for _ in range(num):
        queue = asyncio.Queue()
        iters.append(transmit(queue))
        queues.append(queue)

    return iters

С этим на месте вы main() могли бы выглядеть так:

async def main():
    async with websockets.connect("wss://echo.websocket.org") as ws:
        foo_stream, bar_stream = broadcast(ws, 2)
        asyncio.create_task(foo(foo_stream))
        asyncio.create_task(bar(bar_stream))
        await ws.send("Hello")
...