Запуск Python 3 asyncio.coroutine внутри потока навсегда - PullRequest
0 голосов
/ 17 марта 2020

Я немного новичок в asyncio, когда пытаюсь запустить брокер сообщений HBMQTT внутри потока. В руководстве приведен следующий пример запуска брокера:

import asyncio
import os
from hbmqtt.broker import Broker

@asyncio.coroutine
def broker_coro():
    broker = Broker()
    yield from broker.start()

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(broker_coro())
    asyncio.get_event_loop().run_forever()

Из-за архитектуры, от которой я зависел, брокер должен запускаться в потоке. К сожалению, следующий пример basi c аварийно завершает работу, прежде чем поток вызывает run().

import asyncio
from threading import Thread
from hbmqtt.broker import Broker

class ExampleThread(Thread):
    def __init__(self):
        super().__init__()
        self.daemon = True
        self.config = {
            'listeners': {
                'default': {
                    'max-connections': 50000,
                    'bind': 'localhost:1883',
                    'type': 'tcp',
                },
            },
            'auth': {
                'allow-anonymous': True,
            },
            'plugins': [ 'auth_anonymous' ],
            'topic-check': {
                'enabled': False
            }
        }
        self.loop = None
        self.broker = None

    @asyncio.coroutine
    def broker_coroutine(self):
        self.broker = Broker(self.config, self.loop)
        yield from self.broker.start()
        return self.broker

    def run(self) -> None:
        print('running ...')
        self.loop.run_forever()
        self.loop.run_until_complete(self.broker.shutdown())
        self.loop.close()

    def start(self):
        print('starting thread ...')
        self.loop = asyncio.new_event_loop()
        print('starting server ...')
        try:
            start_server = asyncio.gather(self.broker_coroutine(),
                                          loop=self.loop)
            self.loop.run_until_complete(start_server)
            broker = start_server.result()[0]
        except:
            print(traceback.format_exc())
            self.loop.close()

        super().start()


if __name__ == '__main__':
    thread = ExampleThread()
    thread.start()

При запуске примера выдается следующее исключение:

$ python3.7 ./mqtt.py
starting thread ...
starting server ...
Task was destroyed but it is pending!
task: <Task pending coro=<Broker._broadcast_loop() running at venv/lib/python3.7/site-packages/hbmqtt/broker.py:696> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x8024e7350>()]>>
Exception ignored in: <generator object Broker._broadcast_loop at 0x8027d1e50>
Traceback (most recent call last):
  File "venv/lib/python3.7/site-packages/hbmqtt/broker.py", line 696, in _broadcast_loop
  File "/usr/local/lib/python3.7/asyncio/queues.py", line 161, in get
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 687, in call_soon
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 479, in _check_closed
RuntimeError: Event loop is closed

Может кто-нибудь объяснить, что вызвало событие l oop будет закрыто? Если вместо этого я запускаю простой тестовый сопрограмму, он работает:

async def test_coroutine(self):
    while True:
        await asyncio.sleep(1)
        print('hey!')  

1 Ответ

0 голосов
/ 18 марта 2020

Мне не удалось запустить asyncio.coroutine внутри класса Thread, но вместо этого в выделенном потоке, как в этом примере:

import asyncio
from threading import Thread
from hbmqtt.broker import Broker

class Server():

    def __init__(self):
        self.broker = None
        self.config = {
            'listeners': {
                'default': {
                    'max-connections': 50000,
                    'bind': '127.0.0.1:1883',
                    'type': 'tcp',
                },
            },
            'plugins': [ 'auth_anonymous' ],
            'auth': {
                'allow-anonymous': True,
            },
            'topic-check': {
                'enabled': True,
                'plugins': ['topic_taboo'],
            },
        }

    async def broker_coroutine(self, config, loop):
        self.broker = Broker(config, loop)
        await self.broker.start()

    def start(self):
        loop = asyncio.new_event_loop()
        thread = Thread(target=lambda: self.run(loop))
        thread.start()

    def run(self, loop):
        try:
            future = asyncio.gather(self.broker_coroutine(self.config, loop),
                                    loop=loop,
                                    return_exceptions=True)
            loop.run_until_complete(future)
            loop.run_forever()
        except (Exception, KeyboardInterrupt):
            loop.close()
        finally:
            loop.run_until_complete(self.broker.shutdown())
            loop.close()

if __name__ == '__main__':
    server = Server()
    server.start()
...