Подписки и сопрограммы Websocket: Извлечение сообщения без выхода из l oop? - PullRequest
1 голос
/ 12 марта 2020

Я пытаюсь организовать свои подписки в реальном времени на API, вот что у меня есть:

async def real_time_info():
    channels = ['channel1']
    msg_subscription = msg.public_subscribe(channels)
    async with websockets.connect(URL) as websocket:
        await websocket.send(json.dumps(msg_subscription))
        while websocket.open:
            subscription = json.loads(await websocket.recv())
            return subscription #HERE IS THE PROBLEM I'M TRYING TO SOLVE


async def real_time_data():
    channels = ['channel2']
    msg_subscription = msg.public_subscribe(channels)
    async with websockets.connect(URL) as websocket:
        await websocket.send(json.dumps(msg_subscription))
        while websocket.open:
            subscription = json.loads(await websocket.recv())
            return subscription #HERE IS THE PROBLEM I'M TRYING TO SOLVE


async def main():
    info = asyncio.create_task(real_time_info())
    data = asyncio.create_task(real_time_data())

    while True:
        print(await info)
        print(await data)

asyncio.run(main())

Так что я хочу сделать так, чтобы каждый раз приходило сообщение от real_time_info() или real_time_data(), чтобы распечатать его. Проблема в том, что, поскольку я использую функцию return, она автоматически выходит из функции и, конечно, перестает получать сообщения из веб-сокета.

Технически я могу сделать это, поместив все в одну функцию и выполнив все внутри это без необходимости выхода из for l oop, но тогда это целый беспорядок и действительно трудно управлять сообщениями, которые я получаю из сокета.

Есть ли способ вывести эти сообщения сокета из моих real_time функций для main, хотя они все еще работают в al oop?

Спасибо!

Редактировать: Я также задам другие вопросы о websockets, которые могут сделать все это тривиальным. Прямо сейчас я подключаюсь к 2 каналам одного и того же API (channel1 и channel2) через 2 разные функции. Но поскольку в обоих случаях я пытаюсь прочитать сообщения с сервера через await websocket.recv(), эти сообщения все еще смешиваются в одном и том же «приемнике»? Если да, есть ли способ управлять ответами от серверов через веб-сокеты? Еще раз спасибо.

1 Ответ

1 голос
/ 12 марта 2020

Передайте экземпляр asyncio.Queue() вашим асин c функциям и замените return subscription на await queue.put(('channel1', subscription)). Тогда ваш main может выглядеть так:

async def main():
    queue = asyncio.Queue()
    info = asyncio.create_task(quote_info(queue))
    data = asyncio.create_task(trades_info(queue))

    while True:
        channel, subscription = await queue.get()
        print(channel, subscription)
...