Как правильно завершить вложенные асинхронные задачи - PullRequest
0 голосов
/ 07 апреля 2020

I здесь спросил, как я могу сделать версию asyncio.gather, которая выполняет список задач последовательно, а не параллельно, и хорошие люди рассказали мне, как. Но моя радость закончилась, как только я попытался поместить два метода in_sequence () друг в друга:

import asyncio
from typing import Coroutine


async def work_a():
    print("Work 'A' start")
    await asyncio.sleep(0)
    print("Work 'A' finish")


async def work_b():
    print("Work 'B' start")
    await asyncio.sleep(0)
    print("Work 'B' finish")


async def raise_exception():
    print("raise_exception executed")
    await asyncio.sleep(0)
    raise RuntimeError("raise_exception executed")


async def in_sequence(*tasks: Coroutine):
    """Executes coroutines in sequence"""
    tasks = iter(tasks)
    try:
        for task in tasks:
            await task

    # Terminates all other tasks scheduled for execution in event loop to prevent "RuntimeWarning: coroutine was never awaited" message
    finally:
        for remaining_task in tasks:
            remaining_task.close()

async def main():
    try:
        await in_sequence(              # outer in_sequence()
            work_a(),
            raise_exception(),
            in_sequence(                # inner in_sequence()
                work_b()
            )
        )

    except Exception as e:
        print(f"+++ Quit with exception: {e}")


if __name__ == '__main__':
    asyncio.run(main())

В результате я получил следующий вывод: «RuntimeWarning: сопрограмма« work_b »никогда не ожидалась» ошибка:

Work 'A' start
Work 'A' finish
raise_exception executed
+++ Quit with exception: raise_exception executed
{some path}\terminate_inner_coroutine_issue.py:33: RuntimeWarning: coroutine 'work_b' was never awaited
  remaining_task.close()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

После нескольких экспериментов я понял, в чем проблема: когда метод lift_exception () генерирует исключение во внешнем методе in_sequence (), внешний метод in_sequence () завершает только свои дочерние задачи в блоке finally, т. е. только внутри in_sequence (). В свою очередь, внутренняя функция in_sequence () не завершает свою вложенную задачу work_b (), поскольку в момент возникновения исключения внутренняя функция in_sequence () еще не выполняется и возникает ошибка.

В дальнейших экспериментах Я «изобрел» следующую версию in_sequence (), которая, кажется, решает проблему:

async def in_sequence(name: str, *tasks: Coroutine):
    """Executes coroutines in sequence"""
    print(f"in_sequence '{name}' start")
    tasks = iter(tasks)
    try:
        for task in tasks:
            await task

    except asyncio.CancelledError as c:
        print(f"+++ in_sequence '{name}' catch CancelledError: {c}")

    finally:
        # Terminates all other tasks scheduled for execution in event loop
        for remaining_task in tasks:
            print(f"+++ Cancel {remaining_task}")
            remaining_task.send(None)
            try:
                # Will be raised asyncio.CancelledError in coroutine
                remaining_task.throw(asyncio.CancelledError)

            except asyncio.CancelledError:
                pass
            # remaining_task.close()                                                      # >>> RuntimeWarning: coroutine 'work_b' was never awaited
            # asyncio.create_task(remaining_task).cancel()                                # >>> RuntimeWarning: coroutine 'work_b' was never awaited
            # asyncio.create_task(remaining_task).set_exception(asyncio.CancelledError)   # >>> Exception - Task does not support set_exception operation

    print(f"in_sequence '{name}' finish")

и возвращает следующий вывод:

in_sequence '1' start
Work 'A' start
Work 'A' finish
raise_exception executed
+++ Cancel <coroutine object in_sequence at 0x0000024AF58B6AC0>
in_sequence '2' start
Work 'B' start
+++ in_sequence '2' catch CancelledError: 
in_sequence '2' finish
+++ Quit with exception: coroutine raised StopIteration

Вопросы:

  1. Это правильное решение или есть лучшие варианты? Как завершить дочерние задачи в такой ситуации?
  2. Это нормально, что в моей реализации выдается исключение 'стоп-вызов поднятой сопрограммой'?
...