Немедленно отмените задачу python asyncio, когда все задачи ожидают - PullRequest
0 голосов
/ 15 апреля 2020

Похоже, что событие python l oop ничего не будет делать, пока внутренне что-то не будет выполнено. Это означает, что отмена задач на самом деле ничего не сделает, пока одна из задач не проснется и не начнет выполняться.

SSCCE:

import asyncio
import signal
import time

start_time = time.time()
task1 = None
task2 = None


async def wait(name, duration):
    try:
        await asyncio.sleep(duration)
    except asyncio.CancelledError:
        print(f"{name} got cancellation ({(time.time() - start_time):.2})")
        pass


async def main():
    global task1
    global task2
    task1 = asyncio.ensure_future(wait("task1", 5))
    task2 = asyncio.ensure_future(wait("taks2", 10))

    await asyncio.gather(task1, task2)


def run():
    def handle_signal(a, b):
        print(f"cancelling main task ({(time.time() - start_time):.2})")
        task1.cancel()
        task2.cancel()
    signal.signal(signal.SIGINT, handle_signal)

    asyncio.run(main())
    print('done')


if __name__ == "__main__":
    run()

Вывод:

$ python3.7 async.py 
^Ccancelling main task (0.88)
task1 got cancellation (5.0)
taks2 got cancellation (5.0)
done

Как я могу заставить событие l oop немедленно отменить эти задачи, не дожидаясь его пробуждения из сна?

Используя python3 .6 и python3 .7

1 Ответ

1 голос
/ 15 апреля 2020

Я оставляю старое содержание этого ответа в качестве демонстрации методов, которые можно использовать при работе с обработчиками сигналов, но оказывается, что у asyncio действительно есть собственный API-интерфейс для обработки сигналов.

loop.add_signal_handler можно использовать для установки асинхронно-совместимых обработчиков сигналов. Они будут вызываться под контролем события l oop, и, в отличие от обычных обработчиков сигналов, они могут безопасно взаимодействовать с событием l oop. (Обратите внимание, что это означает, что событие l oop должно ждать, пока любая выполняющаяся в настоящий момент сопрограмма выдаст управление, прежде чем l oop сможет запустить обработчики сигналов.) Обработчики сигналов, установленные через этот API, не должны принимать аргументов, в отличие от обычных обработчиков сигналов.

Если вы удалите аргументы из определения вашего обработчика сигнала и установите обработчик с asyncio.get_event_loop().add_signal_handler(signal.SIGINT, handle_signal), это будет просто работать. На самом деле, было бы лучше установить обработчик из main после создания задач и использовать asyncio.get_running_loop():

def handle_signal():
    print(f"cancelling main task ({(time.time() - start_time):.2})")
    task1.cancel()
    task2.cancel()

async def main():
    global task1
    global task2
    task1 = asyncio.ensure_future(wait("task1", 5))
    task2 = asyncio.ensure_future(wait("taks2", 10))

    asyncio.get_running_loop().add_signal_handler(signal.SIGINT, handle_signal)

    await asyncio.gather(task1, task2)

Далее следует старый ответ, который выполняет работу в обычном режиме. API обработки сигналов, а не API asyncio, потому что я не знал, что у asyncio был API сигналов, когда я его писал.


У вас есть две основные проблемы:

Проблема 1 в том, что пытаться напрямую отменить задачи из обработчика сигнала уже небезопасно. Обработчики сигналов могут работать, пока asyncio выполняет внутреннюю работу, а его структуры данных находятся в несогласованном состоянии. Этот тип проблем даже хуже, чем проблемы параллелизма, потому что блокировка ничего не исправляет. В обработчике сигналов вы можете безопасно сделать очень мало.

Проблема 2 в том, что task.cancel() организует выброс CancelledError в завернутую сопрограмму на следующей итерации события l oop. Вам нужно что-то для запуска итерации события l oop.


Одна из самых сложных и полезных вещей, которые вы можете сделать в обработчике сигнала Python, - это отправка сообщений через queue.SimpleQueue. Это очередь синхронизации, которая обменивает ограниченные функциональные возможности на возможность безопасного вызова его метода put в опасных условиях, таких как __del__ методы, слабые обратные вызовы и другой код, который может прерывать сложные операции в том же потоке.

queue.SimpleQueue.get не является asyn c -совместимым, поэтому код, который получает сообщение, должен быть в другом потоке. Этот поток также не может безопасно вызывать cancel, поскольку только очень ограниченные части asyncio предназначены для обеспечения безопасности потоков, а Task.cancel не является одной из частей, задокументированных как безопасные для потоков. Фактически, второе предложение документации asyncio.Task - «Не поточно-ориентированный».

Однако вы можете получить задачу, запустить функцию в потоке с loop.run_in_executor, и эта функция получит сообщение об отмене из очереди. Задача может дождаться завершения этой функции до sh, а затем безопасно выполнить отмену изнутри события l oop. Другая итерация события l oop будет запущена, как только задача отмены завершится, и задачи, которые вы хотите отменить, будут отменены.

import asyncio
import queue
import signal
import time

start_time = time.time()
task1 = None
task2 = None

cancel_queue = queue.SimpleQueue()

async def wait(name, duration):
    try:
        await asyncio.sleep(duration)
    except asyncio.CancelledError:
        print(f"{name} got cancellation ({(time.time() - start_time):.2})")
        pass

async def canceller_task():
    do_cancel = await asyncio.get_running_loop().run_in_executor(None, receive_cancel_message)
    if do_cancel:
        task1.cancel()
        task2.cancel()

def receive_cancel_message():
    return cancel_queue.get()

def handle_signal(a, b):
    cancel_queue.put(True)

async def main():
    global task1
    global task2
    task1 = asyncio.ensure_future(wait("task1", 5))
    task2 = asyncio.ensure_future(wait("taks2", 10))
    task3 = asyncio.ensure_future(canceller_task())

    await asyncio.gather(task1, task2)

    # End canceller_task and receive_cancel_message even if the signal
    # handler didn't fire.
    # May leave an extra False in the queue if the signal handler fired.
    # That's okay.
    cancel_queue.put(False)
    await task3


def run():
    signal.signal(signal.SIGINT, handle_signal)

    asyncio.run(main())
    print('done')


if __name__ == "__main__":
    run()
...