run_until_complete завершается ошибкой даже после остановки l oop - PullRequest
0 голосов
/ 02 мая 2020

Я пытаюсь написать обработчик SIGTERM, который будет иметь мой run_forever () -l oop

  • Хватит принимать новые задачи.
  • Выполнено выполняющиеся задачи.
  • Завершение работы.

Вот учебное демо, которое я написал:

import asyncio
import signal
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(name)s]: %(message)s', datefmt='%H:%M:%S')
_log = logging.getLogger(__name__)


class Looper:
    def __init__(self, loop):
        self._loop = loop
        self._shutdown = False
        signal.signal(signal.SIGINT, self._exit)
        signal.signal(signal.SIGTERM, self._exit)

    def _exit(self, sig, frame):
        name = signal.Signals(sig).name
        _log.info(f"Received shutdown-signal: {sig} ({name})")
        self._shutdown = True
        self._loop.stop() # << Stopping the event loop here.
        _log.info(f"Loop stop initiated.")
        pending = asyncio.all_tasks(loop=self._loop)
        _log.info(f"Collected {len(pending)} tasks that have been stopped.")
        if pending:
            _log.info("Attempting to gather pending tasks: " + str(pending))
            gatherer_set = asyncio.gather(*pending, loop=self._loop)
            # self._loop.run_until_complete(gatherer_set) # << "RuntimeError: This event loop is already running"
        _log.info("Shutting down for good.")

    async def thumper(self, id, t):
        print(f"{id}: Winding up...")
        while not self._shutdown:
            await asyncio.sleep(t)
            print(f'{id}: Thump!')
        print(f'{id}: Thud.')


loop = asyncio.get_event_loop()
lp = Looper(loop)
loop.create_task(lp.thumper('North Hall', 2))
loop.create_task(lp.thumper('South Hall', 3))
loop.run_forever()
_log.info("Done.")

И на Windows 10, и на сценарии Debian 10, описанных выше, реагирует на SIGINT и выдает результат

North Hall: Winding up...
South Hall: Winding up...
North Hall: Thump!
South Hall: Thump!
North Hall: Thump!
South Hall: Thump!
North Hall: Thump!
09:55:53 INFO [__main__]: Received shutdown-signal: 2 (SIGINT)
09:55:53 INFO [__main__]: Loop stop initiated.
09:55:53 INFO [__main__]: Collected 2 tasks that have been stopped.
09:55:53 INFO [__main__]: Attempting to gather pending tasks: {<Task pending coro=<Looper.thumper() running at amazing_grace.py:42> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x02F91BF0>()]>>, <Task pending coro=<Looper.thumper() running at amazing_grace.py:42> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x02F91C10>()]>>}
09:55:53 INFO [__main__]: Shutting down for good.
09:55:53 INFO [__main__]: Done.

К сожалению, строки «Thud.», означающие, что демонстрационные вызовы thumper (..) действительно завершены, не будут отображаться. Я полагаю, это потому, что «сбор» просто дает мне набор невыполненного будущего. Однако, если я осмелюсь активировать run_until_complete () - строку, даже если она идет позади self._l oop .stop () , вывод заканчивается следующим образом:

[...]
10:24:25 INFO [__main__]: Collected 2 tasks that have been stopped.
10:24:25 INFO [__main__]: Attempting to gather pending tasks: {<Task pending coro=<Looper.thumper() running at amazing_grace.py:41> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x03E417D0>()]>>, <Task pending coro=<Looper.thumper() running at amazing_grace.py:41> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x03E41BF0>()]>>}
Traceback (most recent call last):
  File "amazing_grace.py", line 50, in <module>
    loop.run_forever()
  File "C:\Python37\lib\asyncio\base_events.py", line 539, in run_forever
    self._run_once()
  File "C:\Python37\lib\asyncio\base_events.py", line 1739, in _run_once
    event_list = self._selector.select(timeout)
  File "C:\Python37\lib\selectors.py", line 323, in select
    r, w, _ = self._select(self._readers, self._writers, [], timeout)
  File "C:\Python37\lib\selectors.py", line 314, in _select
    r, w, x = select.select(r, w, w, timeout)
  File "amazing_grace.py", line 35, in _exit
    self._loop.run_until_complete(gatherer_set) # << "This event loop is already running"
  File "C:\Python37\lib\asyncio\base_events.py", line 571, in run_until_complete
    self.run_forever()
  File "C:\Python37\lib\asyncio\base_events.py", line 526, in run_forever
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

Вопрос сводится к

  • , как вызвать или заменить run_until_complete (..) в этом сценарии и
  • почему я вижу этот «L oop выполняется» - ошибка после остановки l oop.

Программа должна работать на Python 3.7, оба под Windows 10 и Linux.

Редактировать пару дней спустя

Как заявляет Закест в своем / ее ответ, возникает проблема при назначении обработчика сигнала и добавлении вызова create_task внутри него; как я вижу, эта процедура может выполняться или не выполняться (даже если нет других задач). Поэтому теперь я добавил проверку sys.platform, чтобы проверить, выполняется ли сценарий под UNIX (). Если это так, я предпочитаю гораздо более надежный loop.add_signal_handler для определения функции обратного вызова, что мне действительно нужно. К счастью, UNIX - мой основной пример использования. Основная строка:

self._loop.add_signal_handler(signal.signal(signal.SIGINT, self._exit, signal.SIGINT, None)

Почему проверка платформы ?: После сигналов c, https://docs.python.org/3/library/asyncio-eventloop.html#unix , l oop .add_signal_handler () недоступно для Windows, что неудивительно, если учесть, что речь идет о UNIX lin go.

Ответы [ 2 ]

1 голос
/ 03 мая 2020
Обработчики сигналов

Python выполняются в главном потоке , в том же потоке, в котором работает ваш l oop. BaseEventLoop.stop() метод не сразу останавливает l oop, вместо этого он просто устанавливает флаг , поэтому, когда ваш l oop запускается в следующий раз, он выполняет только те обратные вызовы, которые уже запланированы, и не планирует больше обратных вызовов (см. run_forever ). Однако l oop не может быть запущен, пока ваш обработчик сигнала не вернется. Это означает, что вы не можете ждать, пока l oop не остановится в обработчике сигнала. Вместо этого вы можете запланировать другую задачу, которая будет ожидать, пока ваши долгосрочные задачи отреагируют на изменение в self._shutdown, а затем остановится на l oop.

class Looper:
    ...

    def _exit(self, sig, frame):
        name = signal.Signals(sig).name
        _log.info("Received shutdown-signal: %s (%s)", sig, name)
        self._shutdown = True

        pending = asyncio.all_tasks(loop=self._loop)
        _log.info("Attempting to gather pending tasks: " + str(pending))
        if pending:
            self._loop.create_task(self._wait_for_stop(pending))

    async def _wait_for_stop(self, tasks):
        await asyncio.gather(*tasks)
        self._loop.stop()  # << Stopping the event loop here.
        _log.info("Loop stop initiated.")

    ...

Еще одна вещь, которую следует упомянуть, - это документация говорит, что signal.signal() обработчики not allowed взаимодействуют с l oop, без указания причины ( см. )

0 голосов
/ 03 мая 2020

Нашел решение, которое будет вызывать self._l oop .stop () из асинхронной функции c, которая сначала будет ожидать выполнения всех других задач. Обратите внимание, что это не ждет себя! Если попытаться, программа заблокируется.

Кроме того, asyncio.wait_for (..) допускают тайм-ауты.

import asyncio
import signal
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s [%(name)s]: %(message)s', datefmt='%H:%M:%S')
_log = logging.getLogger(__name__)


class Looper:
    def __init__(self, loop):
        self._loop = loop
        self._shutdown = False
        signal.signal(signal.SIGINT, self._exit)
        signal.signal(signal.SIGTERM, self._exit)

    async def _a_exit(self):
        self._shutdown = True
        my_task = asyncio.current_task()
        pending = list(filter(lambda x: x is not my_task, asyncio.all_tasks(loop=self._loop)))
        waiters = [asyncio.wait_for(p, timeout = 1.5, loop=self._loop) for p in pending]
        results = await asyncio.gather(*waiters, loop=self._loop, return_exceptions=True)
        n_failure = len(list(filter(lambda x: isinstance(x, Exception), results)))
        _log.info(f"{n_failure} failed processes when quick-gathering the remaining {len(results)} tasks. Stopping loop now.")
        self._loop.stop()

    def _exit(self, sig, frame):
        name = signal.Signals(sig).name
        _log.info(f"Received shutdown-signal: {sig} ({name})")
        self._loop.create_task(self._a_exit())

    async def thumper(self, id, t):
        print(f"{id}: Winding up...")
        while not self._shutdown:
            await asyncio.sleep(t)
            print(f'{id}: Thump!')
        print(f'{id}: Thud.')


loop = asyncio.get_event_loop()
lp = Looper(loop)
loop.create_task(lp.thumper('North Hall', 1))
loop.create_task(lp.thumper('South Hall', 2))
loop.create_task(lp.thumper(' West Hall', 3))
loop.create_task(lp.thumper(' East Hall', 4))
loop.run_forever()
_log.info("Done.")

Вкл. Windows 10 это может привести к выводу

North Hall: Winding up...
South Hall: Winding up...
 West Hall: Winding up...
 East Hall: Winding up...
North Hall: Thump!
South Hall: Thump!
[..]
South Hall: Thump!
North Hall: Thump!
14:20:59 INFO [__main__]: Received shutdown-signal: 2 (SIGINT)
 West Hall: Thump!
 West Hall: Thud.
North Hall: Thump!
North Hall: Thud.
South Hall: Thump!
South Hall: Thud.
14:21:01 INFO [__main__]: 1 failed processes when quick-gathering the remaining 4 tasks. Stopping loop now.
14:21:01 INFO [__main__]: Done.

Неудачный процесс стал жертвой тайм-аута.

Обратите внимание, что это решает мою проблему. Однако вопрос о том, почему l oop .run_until_complete (..) не удается после вызова l oop .stop () , остается открытым.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...