Ну, после многих бессонных ночей я наконец нашел хороший подход к решению этой головоломки.
Благодаря этому сообщению C-Python asyncio: запуск discord.py в потоке
Итак, я переписал свой код следующим образом и смог запустить приложение Quart с клиентом Autobahn внутри, и оба активно работают неблокирующим образом.
Весь __init__.py
выглядит так:
from quart import Quart, request, current_app
from config import Config
def create_app(config_class=Config):
app = Quart(__name__)
app.config.from_object(config_class)
# Blueprint registration
from app.main import bp as main_bp
app.register_blueprint(main_bp)
return app
# Autobahn
import asyncio
from autobahn import wamp
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
import threading
class Component(ApplicationSession):
"""
An application component registering RPC endpoints using decorators.
"""
async def onJoin(self, details):
# register all methods on this object decorated with "@wamp.register"
# as a RPC endpoint
##
results = await self.register(self)
for res in results:
if isinstance(res, wamp.protocol.Registration):
# res is an Registration instance
print("Ok, registered procedure with registration ID {}".format(res.id))
else:
# res is an Failure instance
print("Failed to register procedure: {}".format(res))
def onDisconnect(self):
print('Autobahn disconnected')
@wamp.register(u'com.mathservice.add2')
def add2(self, x, y):
return x + y
async def start():
runner = ApplicationRunner('ws://127.0.0.1:8080/ws', 'realm1')
await runner.run(Component) # use client.start instead of client.run
def run_it_forever(loop):
loop.run_forever()
asyncio.get_child_watcher() # I still don't know if I need this method. It works without it.
loop = asyncio.get_event_loop()
loop.create_task(start())
print('Starting thread for Autobahn...')
thread = threading.Thread(target=run_it_forever, args=(loop,))
thread.start()
print ("Thread for Autobahn has been started...")
from app import models
В этом сценарии мы создаем задачу с помощью runner.run из autobahn и присоединяем ее к текущему циклу, а затем выполняем этот цикл навсегда в новом потоке.
Я был вполне удовлетворен текущим решением .... но затем выяснилось, что у этого решения есть некоторые недостатки, которые были для меня критически важны, например: переподключение, если соединение разорвано (т. Е. Маршрутизатор с перекладиной становится недоступным). При таком подходе, если соединение не удалось инициализировать или сбросить через некоторое время, оно не будет пытаться восстановить соединение. Кроме того, для меня не было очевидно, как использовать API-интерфейс ApplicationSession, то есть зарегистрировать / вызвать RPC из кода в моем приложении Quart.
К счастью, я обнаружил еще один новый API-компонент, который autobahn использовал в их документации:
https://autobahn.readthedocs.io/en/latest/wamp/programming.html#registering-procedures
https://github.com/crossbario/autobahn-python/blob/master/examples/asyncio/wamp/component/backend.py
Он имеет функцию автоматического переподключения, и легко зарегистрировать функции для RPC с помощью декораторов @component.register('com.something.do')
, вам просто нужно import component
перед этим.
Итак, вот окончательный вид решения __init__.py
:
from quart import Quart, request, current_app
from config import Config
def create_app(config_class=Config):
...
return app
from autobahn.asyncio.component import Component, run
from autobahn.wamp.types import RegisterOptions
import asyncio
import ssl
import threading
component = Component(
transports=[
{
"type": "websocket",
"url": u"ws://localhost:8080/ws",
"endpoint": {
"type": "tcp",
"host": "localhost",
"port": 8080,
},
"options": {
"open_handshake_timeout": 100,
}
},
],
realm=u"realm1",
)
@component.on_join
def join(session, details):
print("joined {}".format(details))
async def start():
await component.start() #used component.start() instead of run([component]) as it's async function
def run_it_forever(loop):
loop.run_forever()
loop = asyncio.get_event_loop()
#asyncio.get_child_watcher() # I still don't know if I need this method. It works without it.
asyncio.get_child_watcher().attach_loop(loop)
loop.create_task(start())
print('Starting thread for Autobahn...')
thread = threading.Thread(target=run_it_forever, args=(loop,))
thread.start()
print ("Thread for Autobahn has been started...")
from app import models
Надеюсь, это кому-нибудь поможет. Ура!