Я пытаюсь понять шаблон для бесконечно запущенных задач asyncio и разницу, которую имеет пользовательский обработчик сигнала l oop. Я создаю работников, используя loop.create_task()
, чтобы они выполнялись одновременно.
В коде моих постоянных работников я опрашиваю данные и действую соответствующим образом, когда есть данные.
Я пытаюсь обработать данные процесс отключения изящно по сигналу. Когда поступает сигнал - я снова create_task()
с функцией выключения, так что выполняющиеся в данный момент задачи продолжаются, и завершение работы выполняется в следующей итерации события l oop. Теперь - когда while
l oop одного работника фактически не выполняет ввод-вывод или работу, он предотвращает выполнение обработчика сигнала. Он никогда не заканчивается и не возвращает выполнение, чтобы можно было запускать другие задачи.
Когда я не присоединяю пользовательский обработчик сигнала к al oop и не запускаю эту программу, тогда сигнал доставляется и Программа останавливается. Я предполагаю, что это главный поток, который останавливает сам l oop. Это, очевидно, отличается от попытки запланировать (новую) задачу выключения на работающем l oop, потому что работающий l oop застрял в одной сопрограмме, которая через некоторое время блокируется l oop и не дает вернуть какой-либо контроль или время для других задач.
Существует ли какой-либо стандартный шаблон для таких случаев? Нужно ли asyncio.sleep()
, если нет никакой работы, заменить ли while l oop на что-то другое (например, переназначить саму рабочую функцию)?
Если range(5)
заменен на range(1, 5)
тогда все работники ждут asyncio.sleep, но если один из них этого не делает, то все блокируется. Как справиться с этим случаем, есть ли какой-нибудь стандартный подход?
Приведенный ниже код иллюстрирует проблему.
async def shutdown(loop, sig=None):
print("SIGNAL", sig)
tasks = [t for t in asyncio.all_tasks()
if t is not asyncio.current_task()]
[t.cancel() for t in tasks]
results = await asyncio.gather(*tasks, return_exceptions=True)
# handle_task_results(results)
loop.stop()
async def worker(intval):
print("start", intval)
while True:
if intval:
print("#", intval)
await asyncio.sleep(intval)
loop = asyncio.get_event_loop()
for sig in {signal.SIGINT, signal.SIGTERM}:
loop.add_signal_handler(
sig,
lambda s=sig: asyncio.create_task(shutdown(loop, sig=s)))
workers = [loop.create_task(worker(i)) for i in range(5)] # this range
loop.run_forever()