I здесь спросил, как я могу сделать версию asyncio.gather, которая выполняет список задач последовательно, а не параллельно, и хорошие люди рассказали мне, как. Но моя радость закончилась, как только я попытался поместить два метода in_sequence () друг в друга:
import asyncio
from typing import Coroutine
async def work_a():
print("Work 'A' start")
await asyncio.sleep(0)
print("Work 'A' finish")
async def work_b():
print("Work 'B' start")
await asyncio.sleep(0)
print("Work 'B' finish")
async def raise_exception():
print("raise_exception executed")
await asyncio.sleep(0)
raise RuntimeError("raise_exception executed")
async def in_sequence(*tasks: Coroutine):
"""Executes coroutines in sequence"""
tasks = iter(tasks)
try:
for task in tasks:
await task
# Terminates all other tasks scheduled for execution in event loop to prevent "RuntimeWarning: coroutine was never awaited" message
finally:
for remaining_task in tasks:
remaining_task.close()
async def main():
try:
await in_sequence( # outer in_sequence()
work_a(),
raise_exception(),
in_sequence( # inner in_sequence()
work_b()
)
)
except Exception as e:
print(f"+++ Quit with exception: {e}")
if __name__ == '__main__':
asyncio.run(main())
В результате я получил следующий вывод: «RuntimeWarning: сопрограмма« work_b »никогда не ожидалась» ошибка:
Work 'A' start
Work 'A' finish
raise_exception executed
+++ Quit with exception: raise_exception executed
{some path}\terminate_inner_coroutine_issue.py:33: RuntimeWarning: coroutine 'work_b' was never awaited
remaining_task.close()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
После нескольких экспериментов я понял, в чем проблема: когда метод lift_exception () генерирует исключение во внешнем методе in_sequence (), внешний метод in_sequence () завершает только свои дочерние задачи в блоке finally, т. е. только внутри in_sequence (). В свою очередь, внутренняя функция in_sequence () не завершает свою вложенную задачу work_b (), поскольку в момент возникновения исключения внутренняя функция in_sequence () еще не выполняется и возникает ошибка.
В дальнейших экспериментах Я «изобрел» следующую версию in_sequence (), которая, кажется, решает проблему:
async def in_sequence(name: str, *tasks: Coroutine):
"""Executes coroutines in sequence"""
print(f"in_sequence '{name}' start")
tasks = iter(tasks)
try:
for task in tasks:
await task
except asyncio.CancelledError as c:
print(f"+++ in_sequence '{name}' catch CancelledError: {c}")
finally:
# Terminates all other tasks scheduled for execution in event loop
for remaining_task in tasks:
print(f"+++ Cancel {remaining_task}")
remaining_task.send(None)
try:
# Will be raised asyncio.CancelledError in coroutine
remaining_task.throw(asyncio.CancelledError)
except asyncio.CancelledError:
pass
# remaining_task.close() # >>> RuntimeWarning: coroutine 'work_b' was never awaited
# asyncio.create_task(remaining_task).cancel() # >>> RuntimeWarning: coroutine 'work_b' was never awaited
# asyncio.create_task(remaining_task).set_exception(asyncio.CancelledError) # >>> Exception - Task does not support set_exception operation
print(f"in_sequence '{name}' finish")
и возвращает следующий вывод:
in_sequence '1' start
Work 'A' start
Work 'A' finish
raise_exception executed
+++ Cancel <coroutine object in_sequence at 0x0000024AF58B6AC0>
in_sequence '2' start
Work 'B' start
+++ in_sequence '2' catch CancelledError:
in_sequence '2' finish
+++ Quit with exception: coroutine raised StopIteration
Вопросы:
- Это правильное решение или есть лучшие варианты? Как завершить дочерние задачи в такой ситуации?
- Это нормально, что в моей реализации выдается исключение 'стоп-вызов поднятой сопрограммой'?