Я пытаюсь извлечь задачи из асинхронной очереди и вызвать данный обработчик ошибок, если произошло исключение.Элементы в очереди представлены в виде словарей (поставленных в очередь enqueue_task
), которые содержат задачу, возможный обработчик ошибок и любые аргументы / kwargs, которые могут потребоваться обработчику ошибок.Так как я хотел бы обрабатывать любые ошибки по мере выполнения задач, я сопоставляю каждую задачу со словарем, в котором отсутствует очередь, и пытаюсь получить к нему доступ в случае возникновения исключения.
async def _check_tasks(self):
try:
while self._check_tasks_task or not self._check_task_queue.empty():
tasks = []
details = {}
try:
while len(tasks) < self._CHECK_TASKS_MAX_COUNT:
detail = self._check_task_queue.get_nowait()
task = detail['task']
tasks.append(task)
details[task] = detail
except asyncio.QueueEmpty:
pass
if tasks:
for task in asyncio.as_completed(tasks):
try:
await task
except Exception as e:
logger.exception('')
detail = details[task]
error_handler = detail.get('error_handler')
error_handler_args = detail.get('error_handler_args', [])
error_handler_kwargs = detail.get('error_handler_kwargs', {})
if error_handler:
logger.info('calling error handler')
if inspect.iscoroutinefunction(error_handler):
self.enqueue_task(
task=error_handler(
e,
*error_handler_args,
**error_handler_kwargs
)
)
else:
error_handler(e, *error_handler_args, **error_handler_kwargs)
else:
logger.exception(f'Exception encountered while handling task: {str(e)}')
else:
await asyncio.sleep(self._QUEUE_EMPTY_SLEEP_TIME)
except:
logger.exception('')
def enqueue_task(self, task, error_handler=None, error_handler_args=[],
error_handler_kwargs={}):
if not asyncio.isfuture(task):
task = asyncio.ensure_future(task)
self._app.gateway._check_task_queue.put_nowait({
'task': task,
'error_handler': error_handler,
'error_handler_args': error_handler_args,
'error_handler_kwargs': error_handler_kwargs,
})
Однако, когда возникает исключение, оно появляетсязадача, используемая в качестве ключа, не найдена в словаре details
, и я получаю следующую ошибку:
KeyError: <generator object as_completed.<locals>._wait_for_one at 0x7fc2d1cea308>
Exception encountered while handling task: <generator object as_completed.<locals>._wait_for_one at 0x7fc2d1cea308>
Traceback (most recent call last):
File "/app/app/gateway/gateway.py", line 64, in _check_tasks
detail = details[task]
KeyError: <generator object as_completed.<locals>._wait_for_one at 0x7fc2d1cea308>
Когда task
дает asyncio.as_completed
, это похоже на генератор
<generator object as_completed.<locals>._wait_for_one at 0x7fc2d1cea308>
, когда я ожидаю, что это будет задача
<Task pending coro=<GatewayL1Component._save_tick_to_stream() running at /app/app/gateway/l1.py:320> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fc2d4380d98>()]>>
Почему task
является генератором, а не исходной задачей после получения asyncio.as_complete
?Есть ли способ получить доступ к исходному заданию?