Отправка и получение фреймов через одно и то же соединение через веб-сокет без блокировки - PullRequest
0 голосов
/ 04 октября 2018

Извините за длинный пост, но я копался в этом больше недели, поэтому я пробовал много разных вещей.Я достаточно хорошо знаю Python, но у меня нет опыта работы с асинхронными или неблокирующими функциями в Python.

Я пишу библиотеку API / модуль / пакет / что угодно для веб-службы, для которой требуется веб-сокетподключение.Есть много входящих сообщений, на которые нужно воздействовать, и некоторые связанные с управлением (уровень веб-приложения, а не управляющие сообщения веб-сокета), которые мне нужно отправлять при необходимости.Я могу легко получать сообщения через соединение и действовать на них.Я могу отправлять сообщения, но только в ответ на полученные сообщения, потому что цикл приема всегда блокирует ожидание сообщений.Я не хочу ждать, пока входящие сообщения обработают исходящее, поэтому сценарий не должен зависать на входе, пока не будут получены новые сообщения.В своей борьбе за то, чтобы двусторонняя связь работала так, как хотелось бы, я обнаружил, что мне нужно использовать что-то вроде Twisted, Tornado или asyncio, но до сих пор все попытки, которые я пробовал, терпели неудачу.Обратите внимание, что отправка должна происходить через одно и то же соединение.Открытие кратковременного соединения вне цикла приема не будет работать.Вот что я сделал до сих пор:


Первая итерация кода веб-сокета использовала пакет websocket-client .Это было очень близко к примеру из документов:

import websocket
try:
    import thread
except ImportError:
    import _thread as thread
import time

def on_message(ws, message):
    # Send message frames to respective functions
    # for sorting, objectification, and processing

def on_error(ws, error):
    print(error)

def on_close(ws):
    print("### closed ###")

def on_open(ws):
    def run(*args):
        # Send initial frames required for server to send the desired frames
    thread.start_new_thread(run, ())


if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp(buildWebsocketURL()),
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()

Это блокирует любое дальнейшее выполнение вне цикла.Я попытался изучить модуль _thread, но не смог найти никаких указаний на то, что могу «общаться» с потоком веб-сокетов извне.Я попытался настроить функцию pub / sub слушателя, которая будет перенаправлять данные в ws.send() от другой функции отправителя, но это не сработало.Никаких ошибок или чего-либо, просто никаких указаний на отправленные сообщения.


Далее я попробовал модуль Websockets .Этот, кажется, построен с нуля, чтобы использовать asyncio.Опять же, я получил клиентскую сборку, которая отправляла бы исходные сообщения и действовала на полученных сообщениях, но прогресс на этом остановился:

async def wsconnection():
        async with websockets.connect(getWebsocketURL()) as websocket:
            while True:
                message = await websocket.recv()
                if message == '{"type":"broadcaster.ready"}':
                    subscriptions = getSubscriptions()  # Get subscriptions from ident data
                    logging.info('Sending bookmarks to server as subscription keys')
                    subscriptionupdate = '{{"type": "subscribe","subscription_keys": ["{0}"],"subscription_scope": "update"}}'.format(
                        '","'.join(subscriptions))
                    subscriptioncontent = '{{"subscription_keys": ["{0}"],"subscription_scope": "content","type": "subscribe"}}'.format(
                        '","'.join(subscriptions))
                    logging.debug(subscriptioncontent)
                    await websocket.send(subscriptionupdate)
                    await websocket.send(subscriptioncontent)
                    await websocket.send(
                        '{"type":"message_lobby.read","lobby_id":"1","message_id:"16256829"}')
                sortframe(message) 

asyncio.get_event_loop().run_until_complete(wsconnection())

Я попытался использовать вышеупомянутый обработчик pub / sub, но безрезультатно.После более полного прочтения документации по этому модулю я попытался получить объект протокола websocket (который содержит методы send () и recv ()) вне цикла, а затем создал две сопрограммы (?), Одну для прослушивания входящих сообщений и одну для прослушиванияи отправка исходящих сообщений.До сих пор я не мог получить объект протокола websocket без запуска строки async with websockets.connect(getWebsocketURL()) as websocket: в рамках функции wsconnection ().Я попытался использовать websocket = websockets.client.connect(), который в соответствии с документами , я думал, установит нужный мне объект протокола, но это не так.Кажется, что все примеры, которые я могу найти, не раскрывают какой-либо очевидный способ структурировать отправителя и получателя веб-сокетов так, как мне требуется, без обширных знаний asyncio.


Я также возился с autobahn со схожей структурой кода, как указано выше, используя asyncio и Twisted, но у меня возникли все те же проблемы, что и выше.

До сих пор ближе всего я был с пакетом Websockets выше.В документах есть пример фрагмента для соединения send / recv, но я не могу прочитать, что там происходит, поскольку все это очень специфично для asyncio.У меня действительно возникают проблемы с тем, чтобы обернуть голову вокруг asyncio в целом, и я думаю, что большая проблема заключается в том, что в последнее время она очень быстро развивается, поэтому вокруг этой информации витает тонна очень специфичной для версии информации.К сожалению, не подходит для обучения.~~~~ Это то, что я пытался использовать в этом примере, и он подключается, получает исходные сообщения, затем соединение теряется / закрывается:

async def producer(message):
    print('Sending message')

async def consumer_handler(websocket, path):
    while True:
        message = await websocket.recv()
        await print(message)
        await pub.sendMessage('sender', message)

async def producer_handler(websocket, path):
    while True:
        message = await producer()
        await websocket.send(message)

async def wsconnect():
        async with websockets.connect(getWebsocketURL()) as websocket:
            path = "443"
            async def handler(websocket, path):
                consumer_task = asyncio.ensure_future(
                    consumer_handler(websocket, path))
                producer_task = asyncio.ensure_future(
                    producer_handler(websocket, path))
                done, pending = await asyncio.wait(
                    [consumer_task, producer_task],
                    return_when=asyncio.FIRST_COMPLETED,
                )
                for task in pending:
                    task.cancel()

pub.subscribe(producer, 'sender')

asyncio.get_event_loop().run_until_complete(wsconnect())

Так как мне структурировать этот код, чтобы получать и отправлять черезтакое же соединение с веб-сокетом?У меня также есть различные вызовы API для одного и того же сценария, когда соединение с websocket открыто, что еще больше усложняет ситуацию.

Я использую Python 3.6.6, и этот сценарий предназначен для импорта в качестве модуля в другие сценарии, поэтому функциональность веб-сокета должна быть заключена в функцию или класс для внешних вызовов.

...