Я пытаюсь понять asyncio и то, как я буду использовать его в контексте более широкого применения моего дизайна. Это приложение использует библиотеку для получения событий низкого уровня, и у меня есть для нее драйвер, который предоставляет метод get_event()
. Мое приложение имеет следующие методы и вспомогательную функцию
class Application:
def exec_(self):
asyncio.run(self._start_loops())
async def _start_loops(self):
await asyncio.gather(
asyncio.create_task(self._event_loop()),
asyncio.create_task(self._driver_loop())
)
async def _event_loop(self):
while True:
event = await self._event_queue.get()
self.dispatch_event(event)
async def _driver_loop(self):
loop = asyncio.get_event_loop()
while True:
await loop.run_in_executor(None, _driver_get_event, self._driver, self._event_queue)
def dispatch_event(self, event):
print("event dispatched ", event)
def _driver_get_event(driver, queue):
ev = driver.get_event()
queue.put_nowait(ev)
, теперь это технически работает, но я довольно скептически. Причина в том, что я постоянно создаю новые потоки (исполнитель является исполнителем пула потоков) для _driver_get_event, чтобы я мог собрать новое событие, а затем я использую asyncio.Queue.put_nowait () из этого недолговечного вторичного потока. Этот вторичный поток умирает и не возрождается снова, пока driver_l oop не получит новый шанс порождать новый после того, как управление будет возвращено в ожидании.
Теперь, если бы я делал этот код по-старому, У меня была бы очередь. У очереди был бы долгоживущий вторичный поток while True
, единственная роль которого - получить событие от драйвера, а pu sh - в очереди, а основной поток - в очереди. Вопрос. Даже если обработка события займет много времени, вторичный поток будет продолжать получать события и помещать их в очередь, а основной поток будет обрабатывать их в свое время.
Обратите внимание, что я пытался сделать
async def _driver_loop(self):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _driver_get_event, self._driver, self._event_queue)
def dispatch_event(self, event):
print("event dispatched ", event)
def _driver_get_event(driver, queue):
while True:
ev = driver.get_event()
queue.put_nowait(ev)
, но это не может работать. Задача run_in_executor никогда не завершится sh, поэтому никогда не будет возможности передать управление sh другой задаче.
Как правильно использовать asyncio для этого сценария использования?