Представьте, что мы пишем приложение, которое позволяет пользователю непрерывно запускать приложение (скажем, это серия важных операций над API) и может одновременно запускать несколько приложений. Требования включают в себя:
- пользователь может контролировать количество одновременных приложений (что может ограничивать одновременную загрузку для API, что часто важно)
- , если ОС пытается закрыть Python программа, выполняющая эту вещь, должна изящно завершиться, позволяя любым выполняющимся приложениям завершить свой запуск перед закрытием
Вопрос здесь конкретно о диспетчере задач, который мы кодировали, поэтому давайте заглушим некоторый код, который иллюстрирует эту проблему:
import asyncio
import signal
async def work_chunk():
"""Simulates a chunk of work that can possibly fail"""
await asyncio.sleep(1)
async def protected_work():
"""All steps of this function MUST complete, the caller should shield it from cancelation."""
print("protected_work start")
for i in range(3):
await work_chunk()
print(f"protected_work working... {i+1} out of 3 steps complete")
print("protected_work done... ")
async def subtask():
print("subtask: starting loop of protected work...")
cancelled = False
while not cancelled:
protected_coro = asyncio.create_task(protected_work())
try:
await asyncio.shield(protected_coro)
except asyncio.CancelledError:
cancelled = True
await protected_coro
print("subtask: cancelation complete")
async def subtask_manager():
"""
Manage a pool of subtask workers.
(In the real world, the user can dynamically change the concurrency, but here we'll
hard code it at 3.)
"""
tasks = {}
while True:
for i in range(3):
task = tasks.get(i)
if not task or task.done():
tasks[i] = asyncio.create_task(subtask())
await asyncio.sleep(5)
def shutdown(signal, main_task):
"""Cleanup tasks tied to the service's shutdown."""
print(f"Received exit signal {signal.name}. Scheduling cancelation:")
main_task.cancel()
async def main():
print("main... start")
coro = asyncio.ensure_future(subtask_manager())
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, lambda: shutdown(signal.SIGINT, coro))
loop.add_signal_handler(signal.SIGTERM, lambda: shutdown(signal.SIGTERM, coro))
await coro
print("main... done")
def run():
asyncio.run(main())
run()
subtask_manager
управляет пулом работников, периодически просматривая текущее требование параллелизма и соответствующим образом обновляя количество активных работников (обратите внимание, что код выше сокращает большую часть этого, и просто жестко кодирует число, так как это не важно для вопроса).
subtask
- это сам рабочий l oop, который непрерывно работает protected_work()
, пока кто-то не отменит его.
Но этот код не работает. Когда вы даете ему SIGINT, все это сразу падает.
Прежде чем я объясню дальше, позвольте мне указать вам критический бит кода:
1 protected_coro = asyncio.create_task(protected_work())
2 try:
3 await asyncio.shield(protected_coro)
4 except asyncio.CancelledError:
5 cancelled = True
6 await protected_coro # <-- This will raise CancelledError too!
После некоторой отладки, мы обнаруживаем, что наш блок try / исключением не работает. Мы находим, что и строка 3 И строка 6 поднимают CancelledError.
Когда мы копаем дальше, мы обнаруживаем, что ВСЕ вызовы "await" генерируют CancelledError после отмены менеджера подзадач, а не только линия отмечена выше. (т. е. вторая строка work_chunk (), await asyncio.sleep(1)
и 4-я строка protected_work (), await work_chunk()
, также повысить CancelledError.)
Что здесь происходит?
Казалось бы, Python почему-то не распространяет отмену, как вы ожидаете, а просто вскидывает руки и говорит: «Я отменяю все сейчас».
Почему?
Ясно, я не понимаю, как распространение отмены работает в Python. Я изо всех сил пытался найти документацию о том, как это работает. Может ли кто-нибудь описать мне, как отмена распространяется ясным образом, который объясняет поведение, обнаруженное в примере выше?