Использование asyncio с рутиной блокировки и жесткой петлей - PullRequest
0 голосов
/ 01 апреля 2020

Я пытаюсь понять asyncio и то, как я буду использовать его в контексте более широкого применения моего дизайна. Это приложение использует библиотеку для получения событий низкого уровня, и у меня есть для нее драйвер, который предоставляет метод get_event(). Мое приложение имеет следующие методы и вспомогательную функцию

class Application:
    def exec_(self):
        asyncio.run(self._start_loops())

    async def _start_loops(self):
        await asyncio.gather(
            asyncio.create_task(self._event_loop()),
            asyncio.create_task(self._driver_loop())
        )

    async def _event_loop(self):
        while True:
            event = await self._event_queue.get()
            self.dispatch_event(event)

    async def _driver_loop(self):
        loop = asyncio.get_event_loop()
        while True:
            await loop.run_in_executor(None, _driver_get_event, self._driver, self._event_queue)

    def dispatch_event(self, event):
        print("event dispatched ", event)


def _driver_get_event(driver, queue):
    ev = driver.get_event()
    queue.put_nowait(ev)

, теперь это технически работает, но я довольно скептически. Причина в том, что я постоянно создаю новые потоки (исполнитель является исполнителем пула потоков) для _driver_get_event, чтобы я мог собрать новое событие, а затем я использую asyncio.Queue.put_nowait () из этого недолговечного вторичного потока. Этот вторичный поток умирает и не возрождается снова, пока driver_l oop не получит новый шанс порождать новый после того, как управление будет возвращено в ожидании.

Теперь, если бы я делал этот код по-старому, У меня была бы очередь. У очереди был бы долгоживущий вторичный поток while True, единственная роль которого - получить событие от драйвера, а pu sh - в очереди, а основной поток - в очереди. Вопрос. Даже если обработка события займет много времени, вторичный поток будет продолжать получать события и помещать их в очередь, а основной поток будет обрабатывать их в свое время.

Обратите внимание, что я пытался сделать

    async def _driver_loop(self):
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, _driver_get_event, self._driver, self._event_queue)

    def dispatch_event(self, event):
        print("event dispatched ", event)


def _driver_get_event(driver, queue):
    while True:
        ev = driver.get_event()
        queue.put_nowait(ev)

, но это не может работать. Задача run_in_executor никогда не завершится sh, поэтому никогда не будет возможности передать управление sh другой задаче.

Как правильно использовать asyncio для этого сценария использования?

...