Я немного новичок в asyncio, когда пытаюсь запустить брокер сообщений HBMQTT внутри потока. В руководстве приведен следующий пример запуска брокера:
import asyncio
import os
from hbmqtt.broker import Broker
@asyncio.coroutine
def broker_coro():
broker = Broker()
yield from broker.start()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(broker_coro())
asyncio.get_event_loop().run_forever()
Из-за архитектуры, от которой я зависел, брокер должен запускаться в потоке. К сожалению, следующий пример basi c аварийно завершает работу, прежде чем поток вызывает run()
.
import asyncio
from threading import Thread
from hbmqtt.broker import Broker
class ExampleThread(Thread):
def __init__(self):
super().__init__()
self.daemon = True
self.config = {
'listeners': {
'default': {
'max-connections': 50000,
'bind': 'localhost:1883',
'type': 'tcp',
},
},
'auth': {
'allow-anonymous': True,
},
'plugins': [ 'auth_anonymous' ],
'topic-check': {
'enabled': False
}
}
self.loop = None
self.broker = None
@asyncio.coroutine
def broker_coroutine(self):
self.broker = Broker(self.config, self.loop)
yield from self.broker.start()
return self.broker
def run(self) -> None:
print('running ...')
self.loop.run_forever()
self.loop.run_until_complete(self.broker.shutdown())
self.loop.close()
def start(self):
print('starting thread ...')
self.loop = asyncio.new_event_loop()
print('starting server ...')
try:
start_server = asyncio.gather(self.broker_coroutine(),
loop=self.loop)
self.loop.run_until_complete(start_server)
broker = start_server.result()[0]
except:
print(traceback.format_exc())
self.loop.close()
super().start()
if __name__ == '__main__':
thread = ExampleThread()
thread.start()
При запуске примера выдается следующее исключение:
$ python3.7 ./mqtt.py
starting thread ...
starting server ...
Task was destroyed but it is pending!
task: <Task pending coro=<Broker._broadcast_loop() running at venv/lib/python3.7/site-packages/hbmqtt/broker.py:696> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x8024e7350>()]>>
Exception ignored in: <generator object Broker._broadcast_loop at 0x8027d1e50>
Traceback (most recent call last):
File "venv/lib/python3.7/site-packages/hbmqtt/broker.py", line 696, in _broadcast_loop
File "/usr/local/lib/python3.7/asyncio/queues.py", line 161, in get
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 687, in call_soon
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 479, in _check_closed
RuntimeError: Event loop is closed
Может кто-нибудь объяснить, что вызвало событие l oop будет закрыто? Если вместо этого я запускаю простой тестовый сопрограмму, он работает:
async def test_coroutine(self):
while True:
await asyncio.sleep(1)
print('hey!')