Поймать исключения в отдельных задачах и перезапустить их - PullRequest
0 голосов
/ 15 марта 2019

Если я создаю серию asyncio задач в классе верхнего уровня, все из которых должны выполняться вечно, например:

self.event_loop.create_task(...)
self.event_loop.create_task(...)
self.event_loop.create_task(...)
...

self.event_loop.run_forever()

# Once we fall out of the event loop, collect all remaining tasks,
# cancel them, and terminate the asyncio event loop
tasks = asyncio.Task.all_tasks()
group = asyncio.gather(*tasks, return_exceptions=True)
group.cancel()
self.event_loop.run_until_complete(group)
self.event_loop.close()

Приведенный выше код не обрабатывает следующую ситуацию, которая, на мой взгляд, мне нужна все больше и больше, и я не видел примера в Google или в документах asyncio:

Если одна из задач завершается с ошибкой, исключение не обрабатывается - все другие задачи продолжаются, но эта задача просто молча останавливается (кроме вывода исключения).

Итак, как я могу:

  • Настройка обработки исключительной ситуации, чтобы ошибка больше не молчала
  • Самое главное, перезапустить неудавшуюся задачу, снова эффективно запустив self.event_loop.create_task(...), только для этой задачи? Казалось бы, для этого нужно найти задачу, получившую исключение, в цикле событий, удалить ее и добавить новую - как это сделать, мне не ясно.
  • Разрешить задачам, у которых не было проблем, продолжать работу без перерыва. Хотите избежать каких-либо побочных эффектов при обработке задачи, получившей исключение.

1 Ответ

4 голосов
/ 15 марта 2019

Необработанные исключения прикреплены к объекту задачи и могут быть получены из него с помощью Task.exception() метода .Вызов self.event_loop.create_task(...) возвращает объект задачи, поэтому вы хотите собрать его для проверки исключений.

Если вы хотите перепланировать задачу при возникновении исключения, то вы должны сделать это вновая задача (потому что вы хотите, чтобы она выполнялась в цикле событий), или используйте подпрограмму-обертку, которая перехватывает исключения и просто повторно запускает данную сопрограмму.

Последняя может выглядеть примерно так:

import traceback

def rerun_on_exception(coro, *args, **kwargs):
    while True:
        try:
            await coro(*args, **kwargs)
        except asyncio.CancelledError:
            # don't interfere with cancellations
            raise
        except Exception:
            print("Caught exception")
            traceback.print_exc()

затем оберните ваши сопрограммы вышеуказанной сопрограммой при планировании их как задачи:

self.event_loop.create_task(rerun_on_exception(coroutine_uncalled, arg1value, ... kwarg1=value, ...)

например, передавая аргументы для создания сопрограммы каждый раз, когда возникает исключение.

Другой вариант - использовать asyncio.wait() в отдельной задаче, чтобы вы могли отслеживать исключения во время выполнения цикла и принимать решения о том, как обрабатывать исключения тут же:

def exception_aware_scheduler(*task_definitions, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    task_arguments = {
        loop.create_task(coro(*args, **kwargs)): (coro, args, kwargs)
        for coro, args, kwargs in task_definitions
    }
    while tasks:
        done, pending = await asyncio.wait(
            tasks.keys(), loop=loop, return_when=asyncio.FIRST_EXCEPTION
        )
        for task in done:
            if task.exception() is not None:
                print('Task exited with exception:')
                task.print_stack()
                print('Rescheduling the task\n')
                coro, args, kwargs = tasks.pop(task)
                tasks[loop.create_task(coro(*args, **kwargs))] = coro, args, kwargs

Вызов asyncio.wait() снова контролируется циклом событий, когда любая из запланированных задач завершается из-за исключения, но до тех пор, пока задачи не будут выполненыn отменили или просто завершили свою работу.Когда задача завершается из-за исключительной ситуации, вам нужен способ снова создать ту же подпрограмму (с теми же аргументами), следовательно, настройку *args, **kwargs выше.

Вы запланировали бы только exception_aware_scheduler(), передавая задачи, которые вы хотели передать:

tasks = (
    (coro1, (), {}),  # no arguments
    (coro2, ('arg1', 'arg2'), {}),
    # ...
)
loop.create_task(exception_aware_scheduler(*task_definitions, loop=loop))
...