Python (3.7) asyncio, рабочая задача с while l oop и пользовательский обработчик сигналов - PullRequest
0 голосов
/ 01 апреля 2020

Я пытаюсь понять шаблон для бесконечно запущенных задач asyncio и разницу, которую имеет пользовательский обработчик сигнала l oop. Я создаю работников, используя loop.create_task(), чтобы они выполнялись одновременно.

В коде моих постоянных работников я опрашиваю данные и действую соответствующим образом, когда есть данные.

Я пытаюсь обработать данные процесс отключения изящно по сигналу. Когда поступает сигнал - я снова create_task() с функцией выключения, так что выполняющиеся в данный момент задачи продолжаются, и завершение работы выполняется в следующей итерации события l oop. Теперь - когда while l oop одного работника фактически не выполняет ввод-вывод или работу, он предотвращает выполнение обработчика сигнала. Он никогда не заканчивается и не возвращает выполнение, чтобы можно было запускать другие задачи.

Когда я не присоединяю пользовательский обработчик сигнала к al oop и не запускаю эту программу, тогда сигнал доставляется и Программа останавливается. Я предполагаю, что это главный поток, который останавливает сам l oop. Это, очевидно, отличается от попытки запланировать (новую) задачу выключения на работающем l oop, потому что работающий l oop застрял в одной сопрограмме, которая через некоторое время блокируется l oop и не дает вернуть какой-либо контроль или время для других задач.

Существует ли какой-либо стандартный шаблон для таких случаев? Нужно ли asyncio.sleep(), если нет никакой работы, заменить ли while l oop на что-то другое (например, переназначить саму рабочую функцию)?

Если range(5) заменен на range(1, 5) тогда все работники ждут asyncio.sleep, но если один из них этого не делает, то все блокируется. Как справиться с этим случаем, есть ли какой-нибудь стандартный подход?

Приведенный ниже код иллюстрирует проблему.

async def shutdown(loop, sig=None):
    print("SIGNAL", sig)
    tasks = [t for t in asyncio.all_tasks()
             if t is not asyncio.current_task()]

    [t.cancel() for t in tasks]

    results = await asyncio.gather(*tasks, return_exceptions=True)
    # handle_task_results(results)
    loop.stop()


async def worker(intval):
    print("start", intval)
    while True:
        if intval:
            print("#", intval)
            await asyncio.sleep(intval)


loop = asyncio.get_event_loop()

for sig in {signal.SIGINT, signal.SIGTERM}:
    loop.add_signal_handler(
        sig,
        lambda s=sig: asyncio.create_task(shutdown(loop, sig=s)))

workers = [loop.create_task(worker(i)) for i in range(5)]  # this range

loop.run_forever()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...