Клиент веб-сокета Autobahn в приложении Quart (async Flask) - PullRequest
1 голос
/ 28 марта 2019

Добрый вечер всем.Я не совсем новичок в этом месте, но, наконец, решил зарегистрироваться и попросить о помощи.Я разрабатываю веб-приложение, используя Quart Framework (асинхронный Flask).И теперь, когда приложение стало больше и сложнее, я решил разделить разные процедуры для разных экземпляров сервера, в основном потому, что я хочу поддерживать веб-сервер чистым, более абстрактным и свободным от вычислительной нагрузки.
Поэтому я планирую использовать один вебсервер с несколькими (при необходимости) одинаковыми серверами процедур.Все серверы основаны на Quart Framework, на данный момент только для простоты разработки.Я решил использовать Crossbar.io роутер и автобан для соединения всех серверов вместе.

И тут возникла проблема.Я следовал за этими сообщениями:

Запуск нескольких ApplicationSessions без блокировки с использованием autbahn.asyncio.wamp

Как я могу реализовать интерактивный клиент веб-сокетов с автобаном asyncio?

Как я могу интегрировать кросс-клиент (python3, asyncio) с tkinter

Как отправить сообщение Autobahn / Twisted WAMP извне протокола?

Похоже, я перепробовал все возможные подходы к реализации клиента websocket с автобаном в моем приложении quart.Я не знаю, как сделать так, чтобы обе вещи работали, независимо от того, работает ли приложение Quart, но WS-клиент Autobahn не работает, или наоборот.

Упрощенное мое приложение Quart выглядит так:

from quart import Quart, request, current_app
from config import Config
# Autobahn
import asyncio
from autobahn import wamp
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

import concurrent.futures

class Component(ApplicationSession):
    """
    An application component registering RPC endpoints using decorators.
    """

    async def onJoin(self, details):

        # register all methods on this object decorated with "@wamp.register"
        # as a RPC endpoint
        ##
        results = await self.register(self)
        for res in results:
            if isinstance(res, wamp.protocol.Registration):
                # res is an Registration instance
                print("Ok, registered procedure with registration ID {}".format(res.id))
            else:
                # res is an Failure instance
                print("Failed to register procedure: {}".format(res))

    @wamp.register(u'com.mathservice.add2')
    def add2(self, x, y):
        return x + y


def create_app(config_class=Config):

    app = Quart(__name__)
    app.config.from_object(config_class)

    # Blueprint registration
    from app.main import bp as main_bp
    app.register_blueprint(main_bp)

    print ("before autobahn start")
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        runner = ApplicationRunner('ws://127.0.0.1:8080 /ws', 'realm1')
        future = executor.submit(runner.run(Component))
    print ("after autobahn started")

    return app


from app import models

В этом случае приложение застревает в цикле бегунка, и все приложение не работает (не может обслуживать запросы), это становится возможным, только если я прерываю цикл бегунов (автобан) с помощью Ctrl-C.

CMDпосле запуска:

(quart-app) user@car:~/quart-app$ hypercorn --debug --error-log - --access-log - -b 0.0.0.0:8001 tengine:app
Running on 0.0.0.0:8001 over http (CTRL + C to quit)
before autobahn start
Ok, registered procedure with registration ID 4605315769796303

после нажатия ctrl-C:

...
^Cafter autobahn started
2019-03-29T01:06:52 <Server sockets=[<socket.socket fd=11, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 8001)>]> is serving

Как сделать так, чтобы кварт-приложение работало вместе с клиентом Autobahn неблокирующим образом?Таким образом, автобан открывает и сохраняет соединение через веб-сокет с маршрутизатором Crossbar и тихо слушает в фоновом режиме.

1 Ответ

0 голосов
/ 30 марта 2019

Ну, после многих бессонных ночей я наконец нашел хороший подход к решению этой головоломки.

Благодаря этому сообщению C-Python asyncio: запуск discord.py в потоке

Итак, я переписал свой код следующим образом и смог запустить приложение Quart с клиентом Autobahn внутри, и оба активно работают неблокирующим образом. Весь __init__.py выглядит так:

from quart import Quart, request, current_app
from config import Config


def create_app(config_class=Config):

    app = Quart(__name__)
    app.config.from_object(config_class)

    # Blueprint registration
    from app.main import bp as main_bp
    app.register_blueprint(main_bp)

    return app


# Autobahn
import asyncio
from autobahn import wamp
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
import threading


class Component(ApplicationSession):
    """
    An application component registering RPC endpoints using decorators.
    """

    async def onJoin(self, details):

        # register all methods on this object decorated with "@wamp.register"
        # as a RPC endpoint
        ##
        results = await self.register(self)
        for res in results:
            if isinstance(res, wamp.protocol.Registration):
                # res is an Registration instance
                print("Ok, registered procedure with registration ID {}".format(res.id))
            else:
                # res is an Failure instance
                print("Failed to register procedure: {}".format(res))


    def onDisconnect(self):
        print('Autobahn disconnected')

    @wamp.register(u'com.mathservice.add2')
    def add2(self, x, y):
        return x + y


async def start():
    runner = ApplicationRunner('ws://127.0.0.1:8080/ws', 'realm1')
    await runner.run(Component) # use client.start instead of client.run

def run_it_forever(loop):
    loop.run_forever()

asyncio.get_child_watcher() # I still don't know if I need this method. It works without it.
loop = asyncio.get_event_loop()
loop.create_task(start())
print('Starting thread for Autobahn...')
thread = threading.Thread(target=run_it_forever, args=(loop,))
thread.start()
print ("Thread for Autobahn has been started...")


from app import models

В этом сценарии мы создаем задачу с помощью runner.run из autobahn и присоединяем ее к текущему циклу, а затем выполняем этот цикл навсегда в новом потоке.

Я был вполне удовлетворен текущим решением .... но затем выяснилось, что у этого решения есть некоторые недостатки, которые были для меня критически важны, например: переподключение, если соединение разорвано (т. Е. Маршрутизатор с перекладиной становится недоступным). При таком подходе, если соединение не удалось инициализировать или сбросить через некоторое время, оно не будет пытаться восстановить соединение. Кроме того, для меня не было очевидно, как использовать API-интерфейс ApplicationSession, то есть зарегистрировать / вызвать RPC из кода в моем приложении Quart.

К счастью, я обнаружил еще один новый API-компонент, который autobahn использовал в их документации: https://autobahn.readthedocs.io/en/latest/wamp/programming.html#registering-procedures https://github.com/crossbario/autobahn-python/blob/master/examples/asyncio/wamp/component/backend.py

Он имеет функцию автоматического переподключения, и легко зарегистрировать функции для RPC с помощью декораторов @component.register('com.something.do'), вам просто нужно import component перед этим.

Итак, вот окончательный вид решения __init__.py:

from quart import Quart, request, current_app
from config import Config

def create_app(config_class=Config):
    ...
    return app

from autobahn.asyncio.component import Component, run
from autobahn.wamp.types import RegisterOptions
import asyncio
import ssl
import threading


component = Component(
    transports=[
        {
            "type": "websocket",
            "url": u"ws://localhost:8080/ws",
            "endpoint": {
                "type": "tcp",
                "host": "localhost",
                "port": 8080,
            },
            "options": {
                "open_handshake_timeout": 100,
            }
        },
    ],
    realm=u"realm1",
)

@component.on_join
def join(session, details):
    print("joined {}".format(details))

async def start():
    await component.start() #used component.start() instead of run([component]) as it's async function

def run_it_forever(loop):
    loop.run_forever()

loop = asyncio.get_event_loop()
#asyncio.get_child_watcher() # I still don't know if I need this method. It works without it.
asyncio.get_child_watcher().attach_loop(loop)
loop.create_task(start())
print('Starting thread for Autobahn...')
thread = threading.Thread(target=run_it_forever, args=(loop,))
thread.start()
print ("Thread for Autobahn has been started...")


from app import models

Надеюсь, это кому-нибудь поможет. Ура!

...