Как отключить асинхронный цикл из синхронного контекста с помощью различных механизмов отключения - PullRequest
1 голос
/ 03 июня 2019

Я пытаюсь предоставить функцию синхронного выключения, которая может изящно завершить работу приложения asyncio с помощью сигнала SIGTERM или KeyboardInterrupt SystemExit или просто вызвать функцию напрямую из-за плохого состояния запуска.Я должен закрыть различные задачи, каждая из которых имеет свой собственный способ выключения:

  • aiohttp AppRunner, в настоящее время убитый с помощью метода shutdown, который возвращает сопрограмму, которую нужно ожидать
  • асинхронный APScheduler, в настоящее время уничтоженный с помощью метода shutdown, который вызывает call_soon_threadsafe в текущем цикле событий
  • простой асинхронный цикл, который выполняется навсегда, в настоящее время уничтожается с помощью сигнала cancel в задаче
  • aiohttp ClientSession, который отменяется с помощью метода close в сеансе

Я хочу убить процессор сообщений и игнорировать все новые поступающие сообщения, планировщик, норазрешить выполнение любых задач, которые в данный момент выполняются и зависят от aiohttp ClientSession

Ниже приведено сокращение текущего кода и некоторые комментарии для уточнения логики:

message_processor_future = loop.create_task(message_processor())

def sig_term_handler(_, __):
    logging.info("SIGTERM received, shutting down server...")
    shutdown_server(
        http_runner=http_runner,
        scheduler=scheduler,
        message_processor_future=message_processor_future
    )
signal.signal(signal.SIGTERM, sig_term_handler)

try:
    loop.run_until_complete(message_processor_future)
except (KeyboardInterrupt, SystemExit) as e:
    logging.info("{} received".format(e.__class__.__name__))
    shutdown_server(
        http_runner=http_runner,
        scheduler=scheduler,
        message_processor_future=message_processor_future
    )

async def message_processor():
    while True:
        try:
            # code
        except CancelledError:
            logging.info("Cancelling message processing...")
            return

def shutdown_server(
    http_runner: AppRunner = None,
    scheduler: AsyncIOScheduler = None,
    message_processor_future: asyncio.Task = None
):
    loop = asyncio.get_event_loop()
    # Try to shutdown to the message processor as early as possible so we don't get any new messages
    if message_processor_future:
        logging.info("Cancelling message processor...")
        message_processor_future.cancel()
    # Shutdown apscheduler early to make sure we don't schedule any new tasks
    if scheduler:
        logging.info("Shutting down scheduler...")
        scheduler.shutdown()
    # if the server is running then kill it (this doesn't really have any requirements as it's fairly separate from the application)
    if http_runner:
        logging.info("Shutting down http server...")
        loop.run_until_complete(http_runner.cleanup())

    logging.info(
        f"Waiting for {len(asyncio.Task.all_tasks(loop))} to complete..."
    )
    # wait for any tasks spawned by apscheduler to finish and the message processor to die if it's still running
    loop.run_until_complete(
        asyncio.wait(asyncio.Task.all_tasks(loop), timeout=10)
    )

    logging.info("Closing ingest api client...")
    from collector.tasks.ap_associations import api_client
    # Kill the client session as the tasks that use ClientSession have completed
    loop.run_until_complete(api_client.session.close())

    logging.info("Shutting down process...")
    exit(0)

Когда яотмените приложение с помощью KeyboardInterrupt или SystemExit, оно очищается без каких-либо проблем, это связано с тем, что я считаю, что цикл прекратился, поэтому вызовы loop.run_until_complete безопасны и синхронизированыХронично, но при получении SIGTERM цикл все еще работает, поэтому я получаю это исключение

[2019-06-03 14:52:26,985] [    INFO] --- Shutting down http server...
[2019-06-03 14:52:26,985] [   ERROR] --- Exception in callback Loop._read_from_self
handle: <Handle Loop._read_from_self>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 67, in uvloop.loop.Handle._run
  File "uvloop/loop.pyx", line 324, in uvloop.loop.Loop._read_from_self
  File "uvloop/loop.pyx", line 329, in uvloop.loop.Loop._invoke_signals
  File "uvloop/loop.pyx", line 304, in uvloop.loop.Loop._ceval_process_signals
  File "/opt/collector/collector/__main__.py", line 144, in sig_term_handler
    message_processor_future=message_processor_future
  File "/opt/collector/collector/__main__.py", line 192, in shutdown_server
    loop.run_until_complete(http_runner.cleanup())
  File "uvloop/loop.pyx", line 1440, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1433, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1342, in uvloop.loop.Loop.run_forever
  File "uvloop/loop.pyx", line 445, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.

Это имеет смысл, но я не совсем уверен, как настроить метод shutdown для обработки этого состоянияЯ пытался использовать метод add_done_callback, но, похоже, это тоже не сработало, потому что приложение застревало в цикле while, ожидая завершения или отмены всех задач.

def shutdown_server(
    http_runner: AppRunner = None,
    scheduler: AsyncIOScheduler = None,
    message_processor_future: asyncio.Task = None
):
    loop = asyncio.get_event_loop()
    if loop.is_running():
        task_runner = loop.create_task
    else:
        task_runner = loop.run_until_complete

    if message_processor_future:
        logging.info("Cancelling message processor...")
        message_processor_future.cancel()

    if scheduler:
        logging.info("Shutting down scheduler...")
        scheduler.shutdown()

    if http_runner:
        logging.info("Shutting down http server...")
        task_runner(http_runner.shutdown())

    logging.info(
        f"Waiting for {len(asyncio.Task.all_tasks(loop))} to complete..."
    )

    def finish_shutdown():
        task_runner(http_runner.cleanup())
        logging.info("Closing ingest api client...")
        from collector.tasks.ap_associations import api_client
        task_runner(api_client.session.close())

        logging.info("Shutting down process...")
        exit(0)

    if loop.is_running():
        all_tasks_complete = loop.create_task(asyncio.wait(
            asyncio.Task.all_tasks(loop), timeout=10
        ))
        all_tasks_complete.add_done_callback(finish_shutdown)
        while not all_tasks_complete.done() and not all_tasks_complete.cancelled():
            pass
    else:
        loop.run_until_complete(asyncio.wait(
            asyncio.Task.all_tasks(loop), timeout=10
        ))
        finish_shutdown()

1 Ответ

1 голос
/ 26 июня 2019

Я понял, что вы можете просто вызвать sys.exit в обработчике сигнала, и цикл получит исключение SystemExit и продолжит выполнение оставшейся части предложения catch с остановленным циклом.

т.е.

signal.signal(signal.SIGTERM, lambda _, __: sys.exit(0))

, что позволяет мне реорганизовать код, чтобы он был намного чище, и я также могу заставить задачи обрабатывать свои собственные исключения с помощью этого шаблона:

try:
    loop.run_forever()
except (KeyboardInterrupt, SystemExit) as e:
    logging.info(f"{e.__class__.__name__} received")
except Exception as e:
    exception_manager.handle_exception(e)
finally:
    shutdown(http_server_manager, scheduler)
...