Я оставляю старое содержание этого ответа в качестве демонстрации методов, которые можно использовать при работе с обработчиками сигналов, но оказывается, что у asyncio действительно есть собственный API-интерфейс для обработки сигналов.
loop.add_signal_handler
можно использовать для установки асинхронно-совместимых обработчиков сигналов. Они будут вызываться под контролем события l oop, и, в отличие от обычных обработчиков сигналов, они могут безопасно взаимодействовать с событием l oop. (Обратите внимание, что это означает, что событие l oop должно ждать, пока любая выполняющаяся в настоящий момент сопрограмма выдаст управление, прежде чем l oop сможет запустить обработчики сигналов.) Обработчики сигналов, установленные через этот API, не должны принимать аргументов, в отличие от обычных обработчиков сигналов.
Если вы удалите аргументы из определения вашего обработчика сигнала и установите обработчик с asyncio.get_event_loop().add_signal_handler(signal.SIGINT, handle_signal)
, это будет просто работать. На самом деле, было бы лучше установить обработчик из main
после создания задач и использовать asyncio.get_running_loop()
:
def handle_signal():
print(f"cancelling main task ({(time.time() - start_time):.2})")
task1.cancel()
task2.cancel()
async def main():
global task1
global task2
task1 = asyncio.ensure_future(wait("task1", 5))
task2 = asyncio.ensure_future(wait("taks2", 10))
asyncio.get_running_loop().add_signal_handler(signal.SIGINT, handle_signal)
await asyncio.gather(task1, task2)
Далее следует старый ответ, который выполняет работу в обычном режиме. API обработки сигналов, а не API asyncio, потому что я не знал, что у asyncio был API сигналов, когда я его писал.
У вас есть две основные проблемы:
Проблема 1 в том, что пытаться напрямую отменить задачи из обработчика сигнала уже небезопасно. Обработчики сигналов могут работать, пока asyncio
выполняет внутреннюю работу, а его структуры данных находятся в несогласованном состоянии. Этот тип проблем даже хуже, чем проблемы параллелизма, потому что блокировка ничего не исправляет. В обработчике сигналов вы можете безопасно сделать очень мало.
Проблема 2 в том, что task.cancel()
организует выброс CancelledError
в завернутую сопрограмму на следующей итерации события l oop. Вам нужно что-то для запуска итерации события l oop.
Одна из самых сложных и полезных вещей, которые вы можете сделать в обработчике сигнала Python, - это отправка сообщений через queue.SimpleQueue
. Это очередь синхронизации, которая обменивает ограниченные функциональные возможности на возможность безопасного вызова его метода put
в опасных условиях, таких как __del__
методы, слабые обратные вызовы и другой код, который может прерывать сложные операции в том же потоке.
queue.SimpleQueue.get
не является asyn c -совместимым, поэтому код, который получает сообщение, должен быть в другом потоке. Этот поток также не может безопасно вызывать cancel
, поскольку только очень ограниченные части asyncio предназначены для обеспечения безопасности потоков, а Task.cancel
не является одной из частей, задокументированных как безопасные для потоков. Фактически, второе предложение документации asyncio.Task
- «Не поточно-ориентированный».
Однако вы можете получить задачу, запустить функцию в потоке с loop.run_in_executor
, и эта функция получит сообщение об отмене из очереди. Задача может дождаться завершения этой функции до sh, а затем безопасно выполнить отмену изнутри события l oop. Другая итерация события l oop будет запущена, как только задача отмены завершится, и задачи, которые вы хотите отменить, будут отменены.
import asyncio
import queue
import signal
import time
start_time = time.time()
task1 = None
task2 = None
cancel_queue = queue.SimpleQueue()
async def wait(name, duration):
try:
await asyncio.sleep(duration)
except asyncio.CancelledError:
print(f"{name} got cancellation ({(time.time() - start_time):.2})")
pass
async def canceller_task():
do_cancel = await asyncio.get_running_loop().run_in_executor(None, receive_cancel_message)
if do_cancel:
task1.cancel()
task2.cancel()
def receive_cancel_message():
return cancel_queue.get()
def handle_signal(a, b):
cancel_queue.put(True)
async def main():
global task1
global task2
task1 = asyncio.ensure_future(wait("task1", 5))
task2 = asyncio.ensure_future(wait("taks2", 10))
task3 = asyncio.ensure_future(canceller_task())
await asyncio.gather(task1, task2)
# End canceller_task and receive_cancel_message even if the signal
# handler didn't fire.
# May leave an extra False in the queue if the signal handler fired.
# That's okay.
cancel_queue.put(False)
await task3
def run():
signal.signal(signal.SIGINT, handle_signal)
asyncio.run(main())
print('done')
if __name__ == "__main__":
run()