Как я могу запустить asyncio l oop, если остались незавершенные задачи с защитой от отмены, но их больше нет? - PullRequest
3 голосов
/ 29 мая 2020

Я пытаюсь добавить код в свой существующий asyncio l oop, чтобы обеспечить чистое завершение работы по Ctrl- C. Ниже представлена ​​абстракция того, что он делает.

import asyncio, signal

async def task1():
    print("Starting simulated task1")
    await asyncio.sleep(5)
    print("Finished simulated task1")

async def task2():
    print("Starting simulated task2")
    await asyncio.sleep(5)
    print("Finished simulated task2")

async def tasks():
    await task1()
    await task2()

async def task_loop():
    try:
        while True:
            await asyncio.shield(tasks())
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop")
        raise

async def aiomain():
    loop = asyncio.get_running_loop()
    task = asyncio.Task(task_loop())
    loop.add_signal_handler(signal.SIGINT, task.cancel)
    await task

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        pass

#def main():
#    try:
#        loop = asyncio.get_event_loop()
#        loop.create_task(aiomain())
#        loop.run_forever()
#    except asyncio.CancelledError:
#        pass

if __name__ == '__main__':
    main()

В этом примере представьте, что последовательность task1 и task2 должна быть завершена после ее запуска, иначе возникнут некоторые артефакты. осталось в непоследовательном состоянии. (Отсюда и оболочка asyncio.shield вокруг вызова tasks.)

С указанным выше кодом, если я прерву сценарий вскоре после его запуска, и он просто напечатает Starting simulated task1, то l oop остановится и task2 никогда не запускается. Если я попытаюсь переключиться на версию main, которая закомментирована, она никогда не выйдет, даже если l oop правильно отменен, и больше ничего не происходит по крайней мере в течение нескольких минут. В нем действительно есть небольшой прогресс в том, что он, по крайней мере, завершает любую незавершенную последовательность task1 и task2.

Некоторые возможные решения в результате мозгового штурма, хотя я все еще чувствую, что должно быть что-то попроще что мне не хватает:

  • Создайте оболочку вокруг asyncio.shield, которая увеличивает переменную, синхронизируемую объектом asyncio.Condition, запускает экранированную функцию, а затем уменьшает переменную. Затем в aiomain в обработчике CancelledError дождитесь, пока переменная достигнет нуля, прежде чем повторно вызвать исключение. (В реализации я бы, вероятно, go объединил бы все части этого в один класс с __aexit__, реализовав ожидание нуля на CancelledError logi c.)
  • Пропустите, используя asyncio полностью отменяет механизм отмены, и вместо этого используйте asyncio.Event или аналогичный, чтобы учесть точки прерывания или прерывистый сон. Хотя это действительно кажется более инвазивным, требуя, чтобы я указывал, какие точки считаются прерываемыми, вместо того, чтобы объявлять, какие последовательности необходимо защитить от отмены.

Ответы [ 2 ]

3 голосов
/ 31 мая 2020

Это очень хороший вопрос. Я кое-что узнал, пока работал над ответом, поэтому надеюсь, что вы все еще следите за этой веткой.

Первое, что нужно исследовать, - как работает метод shield ()? По этому поводу документы, мягко говоря, сбивают с толку. Я не мог понять этого, пока не прочитал тестовый код стандартной библиотеки в test_tasks.py. Вот мое понимание:

Рассмотрим этот фрагмент кода:

async def coro_a():
    await asyncio.sheild(task_b())
    ...
task_a = asyncio.create_task(coro_a())
task_a.cancel()

Когда выполняется оператор task_a.cancel (), task_a действительно отменяется. Оператор await немедленно генерирует CancelledError , не дожидаясь, пока task_b завершит sh. Но task_b продолжает работать. Внешняя задача (a) останавливается, а внутренняя задача (b) - нет.

Вот модифицированная версия вашей программы, которая это иллюстрирует. Основное изменение состоит в том, чтобы вставить ожидание в обработчик исключения CancelledError, чтобы ваша программа работала на несколько секунд дольше. Я использую Windows, и поэтому я немного изменил ваш обработчик сигналов, но это второстепенный момент. Я также добавил отметки времени к операторам печати. ​​

import asyncio
import signal
import time

async def task1():
    print("Starting simulated task1", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task1", time.time())

async def task2():
    print("Starting simulated task2", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task2", time.time())

async def tasks():
    await task1()
    await task2()

async def task_loop():
    try:
        while True:
            await asyncio.shield(tasks())
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop", time.time())
        raise

async def aiomain():
    task = asyncio.create_task(task_loop())
    KillNicely(task)
    try:
        await task
    except asyncio.CancelledError:
        print("Caught CancelledError", time.time())
        await asyncio.sleep(5.0)
        raise

class KillNicely:
    def __init__(self, cancel_me):
        self.cancel_me = cancel_me
        self.old_sigint = signal.signal(signal.SIGINT,
                                        self.trap_control_c)

    def trap_control_c(self, signum, stack):
        if signum != signal.SIGINT:
            self.old_sigint(signum, stack)
        else:
            print("Got Control-C", time.time())
            print(self.cancel_me.cancel())

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        print("Program exit, cancelled", time.time())

# Output when ctrlC is struck during task1
# 
# Starting simulated task1 1590871747.8977509
# Got Control-C 1590871750.8385916
# True
# Shutting down task loop 1590871750.8425908
# Caught CancelledError 1590871750.8435903
# Finished simulated task1 1590871752.908434
# Starting simulated task2 1590871752.908434
# Program exit, cancelled 1590871755.8488846        

if __name__ == '__main__':
    main()

Вы можете видеть, что ваша программа не работала, потому что она завершилась, как только task_l oop была отменена, до того, как task1 и task2 имели возможность фини sh. Они все еще были там (или, скорее, они были бы там, если бы программа продолжала работать).

Это иллюстрирует, как взаимодействуют shield () и cancel (), но на самом деле это не решает ваши заявленные проблема. Для этого, я думаю, вам нужен ожидаемый объект, который вы можете использовать, чтобы поддерживать программу в рабочем состоянии до тех пор, пока не будут завершены жизненно важные задачи. Этот объект нужно создать на верхнем уровне и передать вниз по стеку туда, где выполняются жизненно важные задачи. Вот программа, похожая на вашу, но преформируется так, как вы хотите.

Я выполнил три запуска: (1) control- C во время задачи1, (2) control- C во время задачи2, (3) control- C после завершения обеих задач. В первых двух случаях программа продолжалась до завершения задачи 2. В третьем случае он закончился немедленно.

import asyncio
import signal
import time

async def task1():
    print("Starting simulated task1", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task1", time.time())

async def task2():
    print("Starting simulated task2", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task2", time.time())

async def tasks(kwrap):
    fut = asyncio.get_running_loop().create_future()
    kwrap.awaitable = fut
    await task1()
    await task2()
    fut.set_result(1)

async def task_loop(kwrap):
    try:
        while True:
            await asyncio.shield(tasks(kwrap))
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop", time.time())
        raise

async def aiomain():
    kwrap = KillWrapper()
    task = asyncio.create_task(task_loop(kwrap))
    KillNicely(task)
    try:
        await task
    except asyncio.CancelledError:
        print("Caught CancelledError", time.time())
        await kwrap.awaitable
        raise

class KillNicely:
    def __init__(self, cancel_me):
        self.cancel_me = cancel_me
        self.old_sigint = signal.signal(signal.SIGINT,
                                        self.trap_control_c)

    def trap_control_c(self, signum, stack):
        if signum != signal.SIGINT:
            self.old_sigint(signum, stack)
        else:
            print("Got Control-C", time.time())
            print(self.cancel_me.cancel())

class KillWrapper:
    def __init__(self):
        self.awaitable = asyncio.get_running_loop().create_future()
        self.awaitable.set_result(0)

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        print("Program exit, cancelled", time.time())

# Run 1 Control-C during task1
# Starting simulated task1 1590872408.6737766
# Got Control-C 1590872410.7344952
# True
# Shutting down task loop 1590872410.7354996
# Caught CancelledError 1590872410.7354996
# Finished simulated task1 1590872413.6747622
# Starting simulated task2 1590872413.6747622
# Finished simulated task2 1590872418.6750958
# Program exit, cancelled 1590872418.6750958
#
# Run 1 Control-C during task2
# Starting simulated task1 1590872492.927735
# Finished simulated task1 1590872497.9280624
# Starting simulated task2 1590872497.9280624
# Got Control-C 1590872499.5973852
# True
# Shutting down task loop 1590872499.5983844
# Caught CancelledError 1590872499.5983844
# Finished simulated task2 1590872502.9274273
# Program exit, cancelled 1590872502.9287038
#
# Run 1 Control-C after task2 -> immediate exit
# Starting simulated task1 1590873694.2925708
# Finished simulated task1 1590873699.2928336
# Starting simulated task2 1590873699.2928336
# Finished simulated task2 1590873704.2938952
# Got Control-C 1590873706.0790765
# True
# Shutting down task loop 1590873706.0804725
# Caught CancelledError 1590873706.0804725
# Program exit, cancelled 1590873706.0814824
0 голосов
/ 31 мая 2020

Вот что я в итоге использовал:

import asyncio, signal

async def _shield_and_wait_body(coro, finish_event):
    try:
        await coro
    finally:
        finish_event.set()

async def shield_and_wait(coro):
    finish_event = asyncio.Event()
    task = asyncio.shield(_shield_and_wait_body(coro, finish_event))
    try:
        await task
    except asyncio.CancelledError:
        await finish_event.wait()
        raise

def shield_and_wait_decorator(coro_fn):
    return lambda *args, **kwargs: shield_and_wait(coro_fn(*args, **kwargs))

async def task1():
    print("Starting simulated task1")
    await asyncio.sleep(5)
    print("Finished simulated task1")

async def task2():
    print("Starting simulated task2")
    await asyncio.sleep(5)
    print("Finished simulated task2")

@shield_and_wait_decorator
async def tasks():
    await task1()
    await task2()

async def task_loop():
    try:
        while True:
            # Alternative to applying @shield_and_wait_decorator to tasks()
            #await shield_and_wait(tasks())
            await tasks()
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop")
        raise

def sigint_handler(task):
    print("Cancelling task loop")
    task.cancel()

async def aiomain():
    loop = asyncio.get_running_loop()
    task = asyncio.Task(task_loop())
    loop.add_signal_handler(signal.SIGINT, sigint_handler, task)
    await task

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        pass

if __name__ == '__main__':
    main()

Подобно ответу Пола Корнелиуса, это вставляет ожидание завершения подзадачи sh, прежде чем разрешить CancelledError распространиться на цепочка вызовов. Однако для этого не требуется прикасаться к коду, кроме того момента, когда вы вызываете asyncio.shield.

(В моем фактическом варианте использования у меня было одновременно запущено три цикла, используя asyncio.Lock, чтобы убедиться, что одна задача или последовательность задач завершились до того, как начнется другая. У меня также был asyncio.Condition на этой блокировке, обменивающейся от одной сопрограммы к другой. Когда я пробовал подход ожидания в aiomain или main, чтобы все экранированные задачи были готово, я столкнулся с проблемой, когда отмененный родитель освободил блокировку, а затем экранированная задача попыталась сигнализировать переменной условия, используя эту блокировку, давая ошибку. Также не имело смысла переносить получение и освобождение блокировки в экранированный задача - это приведет к тому, что задача B все еще будет выполняться в последовательности: экранированная задача A запускается, сопрограмма для задачи B истекает свой таймер и блокирует ожидание блокировки, Control + C. Помещая ожидание в точку shield_and_wait вызов, с другой стороны, он аккуратно избегал преждевременного снятия блокировки.)

Одно предостережение: кажется, что shield_and_wait_decorator не работает должным образом с методами класса.

...