Как правильно обрабатывать отмененные задачи в Python `asyncio.gather` - PullRequest
3 голосов
/ 09 января 2020

Итак, я делаю еще один удар по модулю asyncio, когда вышел 3.8. Тем не менее, я получаю неожиданные результаты при попытке сделать изящное завершение события l oop. В частности, я слушаю SIGINT, отменяю запущенные Task с, собираю эти Task с, а затем .stop() с событием l oop. Я знаю, что Task поднимают CancelledError, когда они отменяются, что будет распространяться вверх и завершать мой вызов на asyncio.gather, если, согласно документации , я не передам return_exceptions=True на asyncio.gather , что должно заставить gather ожидать отмены всех Task s и возвращать массив CancelledError s. Однако, похоже, что return_exceptions=True все равно приводит к немедленному прерыванию моего gather вызова, если я пытаюсь gather отменить Task s.

Вот код для воспроизведения эффекта. Я бегу python 3.8.0:

# demo.py

import asyncio
import random
import signal


async def worker():
    sleep_time = random.random() * 3
    await asyncio.sleep(sleep_time)
    print(f"Slept for {sleep_time} seconds")

async def dispatcher(queue):
    while True:
        await queue.get()
        asyncio.create_task(worker())
        tasks = asyncio.all_tasks()
        print(f"Running Tasks: {len(tasks)}")

async def shutdown(loop):
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    for task in tasks:
        task.cancel()
    print(f"Cancelling {len(tasks)} outstanding tasks")
    results = await asyncio.gather(*tasks, return_exceptions=True)
    print(f"results: {results}")
    loop.stop()

async def main():
    loop = asyncio.get_event_loop()
    loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(shutdown(loop)))
    queue = asyncio.Queue()
    asyncio.create_task(dispatcher(queue))

    while True:
        await queue.put('tick')
        await asyncio.sleep(1)


asyncio.run(main())

Вывод:

>> python demo.py 
Running Tasks: 3
Slept for 0.3071352174511871 seconds
Running Tasks: 3
Running Tasks: 4
Slept for 0.4152310498820644 seconds
Running Tasks: 4
^CCancelling 4 outstanding tasks
Traceback (most recent call last):
  File "demo.py", line 38, in <module>
    asyncio.run(main())
  File "/Users/max.taggart/.pyenv/versions/3.8.0/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Users/max.taggart/.pyenv/versions/3.8.0/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
    return future.result()
asyncio.exceptions.CancelledError

Я предполагаю, что в событии l oop есть что-то, чего я не делаю понимаю, но я ожидаю, что все CancelledError вернутся в виде массива объектов, хранящихся в results, и затем смогут продолжить, а не сразу увидеть ошибку.

1 Ответ

4 голосов
/ 09 января 2020

Что вызывает ошибку?

Проблема с использованием asyncio.all_tasks() заключается в том, что он возвращает ВСЕ задачи, даже те, которые вы не создавали напрямую. Измените свой код следующим образом, чтобы увидеть, что вы отменяете:

for task in tasks:
    print(task)
    task.cancel()

Вы увидите не только worker связанных задач, но также:

<Task pending coro=<main() running at ...>

Отмена main приводит к беспорядок внутри asyncio.run(main()) и вы получите ошибку. Давайте сделаем быструю / грязную модификацию, чтобы исключить эту задачу из отмены:

tasks = [
    t 
    for t 
    in asyncio.all_tasks() 
    if (
        t is not asyncio.current_task()
        and t._coro.__name__ != 'main'
    )
]

for task in tasks:
    print(task)
    task.cancel()

Теперь вы увидите, что results.

l oop .stop () приводит к ошибке

Пока вы набрали results, вы получите еще одну ошибку Event loop stopped before Future completed. Это происходит потому, что asyncio.run(main()) хотят работать до завершения main().

Вы должны реструктурировать свой код, чтобы разрешить выполнение сопрограммы, переданной вами в asyncio.run, вместо остановки события l oop или, например, , используйте l oop .run_forever () вместо asyncio.run.

Вот быстрая / грязная демонстрация того, что я имею в виду:

async def shutdown(loop):
    # ...

    global _stopping
    _stopping = True
    # loop.stop()

_stopping = False

async def main():
    # ...

    while not _stopping:
        await queue.put('tick')
        await asyncio.sleep(1)

Теперь ваш код будет работать без ошибок. Не используйте приведенный выше код на практике, это всего лишь пример. Попробуйте реструктурировать свой код, как я упоминал выше.

Как правильно обрабатывать задачи

Не использовать asyncio.all_tasks().

Если вы создаете некоторые задачи, которые вы хотите отменить в будущем, сохраните их и отмените только сохраненные задачи. Псевдокод:

i_created = []

# ...

task = asyncio.create_task(worker())
i_created.append(task)

# ...

for task in i_created:
    task.cancel()

Может показаться неудобным, но это способ убедиться, что вы не отменяете то, что не хотите отменять.

Еще одна вещь

Обратите также внимание, что asyncio.run() делает намного больше , чем просто начало события l oop. В частности, отменяет все зависающие задачи перед завершением. В некоторых случаях это может быть полезно, хотя я советую вместо этого обрабатывать все отмены вручную.

...