Есть ли способ получить доступ к исходной задаче, переданной asyncio.as_completed? - PullRequest
1 голос
/ 04 апреля 2019

Я пытаюсь извлечь задачи из асинхронной очереди и вызвать данный обработчик ошибок, если произошло исключение.Элементы в очереди представлены в виде словарей (поставленных в очередь enqueue_task), которые содержат задачу, возможный обработчик ошибок и любые аргументы / kwargs, которые могут потребоваться обработчику ошибок.Так как я хотел бы обрабатывать любые ошибки по мере выполнения задач, я сопоставляю каждую задачу со словарем, в котором отсутствует очередь, и пытаюсь получить к нему доступ в случае возникновения исключения.

async def _check_tasks(self):
    try:
        while self._check_tasks_task or not self._check_task_queue.empty():
            tasks = []
            details = {}
            try:
                while len(tasks) < self._CHECK_TASKS_MAX_COUNT:
                    detail = self._check_task_queue.get_nowait()
                    task = detail['task']
                    tasks.append(task)
                    details[task] = detail
            except asyncio.QueueEmpty:
                pass

            if tasks:
                for task in asyncio.as_completed(tasks):
                    try:
                        await task
                    except Exception as e:
                        logger.exception('')
                        detail = details[task]
                        error_handler = detail.get('error_handler')
                        error_handler_args = detail.get('error_handler_args', [])
                        error_handler_kwargs = detail.get('error_handler_kwargs', {})

                        if error_handler:
                            logger.info('calling error handler')
                            if inspect.iscoroutinefunction(error_handler):
                                self.enqueue_task(
                                    task=error_handler(
                                        e,
                                        *error_handler_args,
                                        **error_handler_kwargs
                                    )
                                )
                            else:
                                error_handler(e, *error_handler_args, **error_handler_kwargs)
                        else:
                            logger.exception(f'Exception encountered while handling task: {str(e)}')
            else:
                await asyncio.sleep(self._QUEUE_EMPTY_SLEEP_TIME)
    except:
        logger.exception('')


def enqueue_task(self, task, error_handler=None, error_handler_args=[],
                 error_handler_kwargs={}):
    if not asyncio.isfuture(task):
        task = asyncio.ensure_future(task)

    self._app.gateway._check_task_queue.put_nowait({
        'task': task,
        'error_handler': error_handler,
        'error_handler_args': error_handler_args,
        'error_handler_kwargs': error_handler_kwargs,
    })

Однако, когда возникает исключение, оно появляетсязадача, используемая в качестве ключа, не найдена в словаре details, и я получаю следующую ошибку:

KeyError: <generator object as_completed.<locals>._wait_for_one at 0x7fc2d1cea308>
Exception encountered while handling task: <generator object as_completed.<locals>._wait_for_one at 0x7fc2d1cea308>
Traceback (most recent call last):
  File "/app/app/gateway/gateway.py", line 64, in _check_tasks
    detail = details[task]
KeyError: <generator object as_completed.<locals>._wait_for_one at 0x7fc2d1cea308>

Когда task дает asyncio.as_completed, это похоже на генератор

<generator object as_completed.<locals>._wait_for_one at 0x7fc2d1cea308>

, когда я ожидаю, что это будет задача

<Task pending coro=<GatewayL1Component._save_tick_to_stream() running at /app/app/gateway/l1.py:320> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fc2d4380d98>()]>>

Почему task является генератором, а не исходной задачей после получения asyncio.as_complete?Есть ли способ получить доступ к исходному заданию?

Ответы [ 2 ]

1 голос
/ 05 апреля 2019

Почему задание является генератором, а не исходным заданием после получения asyncio.as_complete?

Проблема в том, что as_completed не является асинхронным итератором (который вы бы исчерпали async for), но обычный итератор.Если асинхронный итератор __aiter__ может приостановить работу в ожидании асинхронного события, обычный итератор __iter__ должен немедленно предоставить результат.Очевидно, что он не может привести к завершенной задаче, поскольку ни у одной задачи еще не было времени на завершение, поэтому он возвращает ожидаемый объект, который на самом деле ожидает завершения задачи.Это объект, который выглядит как генератор.

Как еще одно следствие реализации, ожидание этой задачи дает вам результат исходной задачи, а не ссылку на объект задачи - в отличие от исходного concurrent.futures.as_completed.Это делает asyncio.as_completed менее интуитивно понятным и трудным в использовании, а есть отчет об ошибках , в котором утверждается, что as_completed также должен использоваться в качестве асинхронного итератора, обеспечивая правильную семантику.(Это можно сделать обратно-совместимым способом.)

Есть ли способ получить доступ к исходному заданию?

В качестве обходного пути вы можете создать асинхронныйверсия as_completed, заключив исходную задачу в сопрограмму, которая завершается, когда задача выполняется, и имеет задачу в качестве результата:

async def as_completed_async(futures):
    loop = asyncio.get_event_loop()
    wrappers = []
    for fut in futures:
        assert isinstance(fut, asyncio.Future)  # we need Future or Task
        # Wrap the future in one that completes when the original does,
        # and whose result is the original future object.
        wrapper = loop.create_future()
        fut.add_done_callback(wrapper.set_result)
        wrappers.append(wrapper)

    for next_completed in asyncio.as_completed(wrappers):
        # awaiting next_completed will dereference the wrapper and get
        # the original future (which we know has completed), so we can
        # just yield that
        yield await next_completed

Это должно позволить вам получить исходные задачи - вотпростой тестовый пример:

async def main():
    loop = asyncio.get_event_loop()
    fut1 = loop.create_task(asyncio.sleep(.2))
    fut1.t = .2
    fut2 = loop.create_task(asyncio.sleep(.3))
    fut2.t = .3
    fut3 = loop.create_task(asyncio.sleep(.1))
    fut3.t = .1
    async for fut in as_completed_async([fut1, fut2, fut3]):
        # using the `.t` attribute shows that we've got the original tasks
        print('completed', fut.t)

asyncio.get_event_loop().run_until_complete(main())
0 голосов
/ 04 апреля 2019

Решено с помощью asyncio.gather вместо:

async def _check_tasks(self):
    while self._check_tasks_task or not self._check_task_queue.empty():
        tasks = []
        details = []
        try:
            while len(tasks) < self._CHECK_TASKS_MAX_COUNT:
                detail = self._check_task_queue.get_nowait()
                task = detail['task']
                tasks.append(task)
                details.append(detail)
        except asyncio.QueueEmpty:
            pass

        if tasks:
            results = await asyncio.gather(*tasks, return_exceptions=True)
            for i, result in enumerate(results):
                if isinstance(result, Exception):
                    detail = details[i]
                    error_handler = detail.get('error_handler')
                    error_handler_args = detail.get('error_handler_args', [])
                    error_handler_kwargs = detail.get('error_handler_kwargs', {})

                    if error_handler:
                        logger.info('calling error handler')
                        if inspect.iscoroutinefunction(error_handler):
                            self.enqueue_task(
                                task=error_handler(
                                    result,
                                    *error_handler_args,
                                    **error_handler_kwargs
                                )
                            )
                        else:
                            error_handler(
                                result, *error_handler_args, **error_handler_kwargs
                            )
                    else:
                        msg = f'Exception encountered while handling task: {str(result)}'
                        logger.exception(msg)
        else:
            await asyncio.sleep(self._QUEUE_EMPTY_SLEEP_TIME)
...