Задачи, созданные с помощью create_task, которые никогда не ожидаются, похоже, нарушают ожидания отмены для дочерних задач - PullRequest
2 голосов
/ 13 февраля 2020

Представьте, что мы пишем приложение, которое позволяет пользователю непрерывно запускать приложение (скажем, это серия важных операций над API) и может одновременно запускать несколько приложений. Требования включают в себя:

  • пользователь может контролировать количество одновременных приложений (что может ограничивать одновременную загрузку для API, что часто важно)
  • , если ОС пытается закрыть Python программа, выполняющая эту вещь, должна изящно завершиться, позволяя любым выполняющимся приложениям завершить свой запуск перед закрытием

Вопрос здесь конкретно о диспетчере задач, который мы кодировали, поэтому давайте заглушим некоторый код, который иллюстрирует эту проблему:

import asyncio
import signal


async def work_chunk():
    """Simulates a chunk of work that can possibly fail"""
    await asyncio.sleep(1)


async def protected_work():
    """All steps of this function MUST complete, the caller should shield it from cancelation."""
    print("protected_work start")
    for i in range(3):
        await work_chunk()
        print(f"protected_work working... {i+1} out of 3 steps complete")
    print("protected_work done... ")


async def subtask():
    print("subtask: starting loop of protected work...")
    cancelled = False
    while not cancelled:
        protected_coro = asyncio.create_task(protected_work())
        try:
            await asyncio.shield(protected_coro)
        except asyncio.CancelledError:
            cancelled = True
            await protected_coro
    print("subtask: cancelation complete")


async def subtask_manager():
    """
    Manage a pool of subtask workers. 
    (In the real world, the user can dynamically change the concurrency, but here we'll 
    hard code it at 3.)
    """
    tasks = {}
    while True:
        for i in range(3):
            task = tasks.get(i)
            if not task or task.done():
                tasks[i] = asyncio.create_task(subtask())
        await asyncio.sleep(5)


def shutdown(signal, main_task):
    """Cleanup tasks tied to the service's shutdown."""
    print(f"Received exit signal {signal.name}. Scheduling cancelation:")
    main_task.cancel()


async def main():
    print("main... start")
    coro = asyncio.ensure_future(subtask_manager())
    loop = asyncio.get_running_loop()
    loop.add_signal_handler(signal.SIGINT, lambda: shutdown(signal.SIGINT, coro))
    loop.add_signal_handler(signal.SIGTERM, lambda: shutdown(signal.SIGTERM, coro))
    await coro
    print("main... done")


def run():
    asyncio.run(main())


run()

subtask_manager управляет пулом работников, периодически просматривая текущее требование параллелизма и соответствующим образом обновляя количество активных работников (обратите внимание, что код выше сокращает большую часть этого, и просто жестко кодирует число, так как это не важно для вопроса).

subtask - это сам рабочий l oop, который непрерывно работает protected_work(), пока кто-то не отменит его.

Но этот код не работает. Когда вы даете ему SIGINT, все это сразу падает.

Screenshot of log output from invoking run(), illustrating that CancelledError is raised in the

Прежде чем я объясню дальше, позвольте мне указать вам критический бит кода:

1   protected_coro = asyncio.create_task(protected_work())
2   try:
3       await asyncio.shield(protected_coro)
4   except asyncio.CancelledError:
5       cancelled = True
6       await protected_coro  # <-- This will raise CancelledError too!

После некоторой отладки, мы обнаруживаем, что наш блок try / исключением не работает. Мы находим, что и строка 3 И строка 6 поднимают CancelledError.

Когда мы копаем дальше, мы обнаруживаем, что ВСЕ вызовы "await" генерируют CancelledError после отмены менеджера подзадач, а не только линия отмечена выше. (т. е. вторая строка work_chunk (), await asyncio.sleep(1) и 4-я строка protected_work (), await work_chunk(), также повысить CancelledError.)

Что здесь происходит?

Казалось бы, Python почему-то не распространяет отмену, как вы ожидаете, а просто вскидывает руки и говорит: «Я отменяю все сейчас».

Почему?

Ясно, я не понимаю, как распространение отмены работает в Python. Я изо всех сил пытался найти документацию о том, как это работает. Может ли кто-нибудь описать мне, как отмена распространяется ясным образом, который объясняет поведение, обнаруженное в примере выше?

Ответы [ 2 ]

1 голос
/ 14 февраля 2020

Что здесь происходит? Казалось бы, Python почему-то не распространяет отмену, как вы ожидаете, а просто вскидывает руки и говорит: «Я отменяю все сейчас».

TL; DR Отмена всего - это именно то, что происходит, просто потому, что происходит событие l oop.

Чтобы исследовать это, я изменил вызов add_signal_handler() на loop.call_later(.5, lambda: shutdown(signal.SIGINT, coro)). Python Ctrl + C обработка имеет нечетные углы , и я хотел проверить, является ли странное поведение результатом этого. Но ошибка была совершенно воспроизводимой без сигналов, так что это не так.

И все же, отмена асинхронного режима действительно не должна работать, как показывает ваш код. Отмена задачи распространяется на будущее (или другую задачу), которую она ожидает, но shield специально реализован, чтобы обойти это. Он создает и возвращает будущее fre sh и связывает результат исходного (экранированного) будущего с новым таким образом, что cancel() не знает, как ему следовать.

Мне потребовалось некоторое время, чтобы разобраться в том, что на самом деле происходит, и это:

  • await coro в конце main ждет задача, которая будет отменена, поэтому она получает CancelledError, как только shutdown отменяет его;

  • исключение приводит к выходу main и вводит последовательность очистки в конце asyncio.run(). Эта последовательность очистки отменяет все задач, включая те, которые вы экранировали.

Вы можете проверить это, изменив await coro в конце main() на :

try:
    await coro
finally:
    print('main... done')

И вы увидите, что "main ... done" напечатано до всех таинственных отмен, которые вы наблюдали.

Так что это очищает тайну и исправляет проблема, вы должны отложить выход main, пока все не будет сделано. Например, вы можете создать диктовку tasks в main, передать ее subtask_manager(), а затем дождаться выполнения этих критических задач при отмене основной задачи:

async def subtask_manager(tasks):
    while True:
        for i in range(3):
            task = tasks.get(i)
            if not task or task.done():
                tasks[i] = asyncio.create_task(subtask())

        try:
            await asyncio.sleep(5)
        except asyncio.CancelledError:
            for t in tasks.values():
                t.cancel()
            raise

# ... shutdown unchanged

async def main():
    print("main... start")
    tasks = {}
    main_task = asyncio.ensure_future(subtask_manager(tasks))
    loop = asyncio.get_running_loop()
    loop.add_signal_handler(signal.SIGINT, lambda: shutdown(signal.SIGINT, main_task))
    loop.add_signal_handler(signal.SIGTERM, lambda: shutdown(signal.SIGTERM, main_task))
    try:
        await main_task
    except asyncio.CancelledError:
        await asyncio.gather(*tasks.values())
    finally:
        print("main... done")

Обратите внимание, что основная задача должен явно отменить свои подзадачи, потому что это на самом деле не произойдет автоматически. Отмена распространяется через цепочку await с, и subtask_manager явно не ожидает своих подзадач, она просто порождает их и ждет чего-то еще, эффективно их экранируя.

1 голос
/ 13 февраля 2020

После долгого изучения этой проблемы и экспериментирования с другими фрагментами кода (где распространение отмены работает, как и ожидалось), я начал задумываться, не является ли проблема Python не знанием порядок распространения здесь , в этом случае.

Но почему?

Ну, subtask_manager создает задачи, но не ждет их.

Может быть, Python не предполагает, что сопрограмма, создавшая эту задачу (с create_task), владеет этой задачей? Я думаю, Python использует ключевое слово await исключительно , чтобы узнать, в каком порядке распространять отмену, и если после обхода всего дерева задач он находит задачи, которые еще не был отменен, он просто уничтожил их все.

Поэтому мы сами должны управлять распространением отмены задач в любом месте, где, как мы знаем, мы не ожидали асинхронной задачи c. Итак, нам нужно провести рефакторинг subtask_manager, чтобы поймать его собственную отмену, и явно отменить , а затем дождаться всех его дочерних задач:

async def subtask_manager():
    """
    Manage a pool of subtask workers. 
    (In the real world, the user can dynamically change the concurrency, but here we'll 
    hard code it at 3.)
    """
    tasks = {}
    while True:
        for i in range(3):
            task = tasks.get(i)
            if not task or task.done():
                tasks[i] = asyncio.create_task(subtask())
        try:
            await asyncio.sleep(5)
        except asyncio.CancelledError:
            print("cancelation detected, canceling children")
            [t.cancel() for t in tasks.values()]
            await asyncio.gather(*[t for t in tasks.values()])
            return

Теперь наш код работает, как и ожидалось:

Working log output.

Примечание. Я ответил на свой собственный вопрос и ответ на вопрос, но все еще чувствую себя неудовлетворенным своим текстовым ответом о том, как работает распространение отмены. Если у кого-то есть лучшее объяснение того, как работает распространение отмены, я хотел бы прочитать его.

...