Когда подходящее время для вызова loop.close ()? - PullRequest
0 голосов
/ 10 мая 2018

Я немного поэкспериментировал с asyncio и прочитал PEPs ; несколько уроков; и даже книга О'Рейли .

Я думаю, что понял это, но я все еще озадачен поведением loop.close(), которое я не могу понять, когда его "безопасно" вызывать.

Дистиллированный, мой простой пример использования - это набор блокирующих вызовов "старой школы", которые я заключаю в run_in_executor() и внешнюю сопрограмму; если какой-либо из этих вызовов будет неправильным, я хочу остановить выполнение, отменить все еще не выполненные вызовы, распечатать разумный журнал и затем (надеюсь, чисто) уйти с дороги.

Скажи, что-то вроде этого:

import asyncio
import time


def blocking(num):
    time.sleep(num)
    if num == 2:
        raise ValueError("don't like 2")
    return num


async def my_coro(loop, num):
    try:
        result = await loop.run_in_executor(None, blocking, num)
        print(f"Coro {num} done")
        return result
    except asyncio.CancelledError:
        # Do some cleanup here.
        print(f"man, I was canceled: {num}")


def main():
    loop = asyncio.get_event_loop()
    tasks = []
    for num in range(5):
        tasks.append(loop.create_task(my_coro(loop, num)))

    try:
        # No point in waiting; if any of the tasks go wrong, I
        # just want to abandon everything. The ALL_DONE is not
        # a good solution here.
        future = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
        done, pending = loop.run_until_complete(future)
        if pending:
            print(f"Still {len(pending)} tasks pending")
            # I tried putting a stop() - with/without a run_forever()
            # after the for - same exception raised.
            #  loop.stop()
            for future in pending:
                future.cancel()

        for task in done:
            res = task.result()
            print("Task returned", res)
    except ValueError as error:
        print("Outer except --", error)
    finally:
        # I also tried placing the run_forever() here,
        # before the stop() - no dice.
        loop.stop()
        if pending:
            print("Waiting for pending futures to finish...")
            loop.run_forever()
        loop.close()

Я пробовал несколько вариантов вызовов stop() и run_forever(), кажется, "run_forever сначала, затем остановка" - это тот, который следует использовать согласно для pydoc и без вызова close() дает удовлетворительное:

Coro 0 done
Coro 1 done
Still 2 tasks pending
Task returned 1
Task returned 0
Outer except -- don't like 2
Waiting for pending futures to finish...
man, I was canceled: 4
man, I was canceled: 3

Process finished with exit code 0

Однако, когда добавляется вызов close() (как показано выше), я получаю два исключения:

exception calling callback for <Future at 0x104f21438 state=finished returned int>
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 324, in _invoke_callbacks
    callback(self)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 414, in _call_set_state
    dest_loop.call_soon_threadsafe(_set_state, destination, source)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
    self._check_closed()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

, что в лучшем случае раздражает, но для меня оно совершенно озадачивает: и, что еще хуже, я не смог понять, каким будет Правильный Путь в такой ситуации.

Итак, два вопроса:

  • что мне не хватает? как мне изменить код выше, чтобы при вызове close() не возникало?

  • что на самом деле происходит, если я не звоню close() - в этом тривиальном случае я предполагаю, что это в значительной степени избыточно; но каковы могут быть последствия в «реальном» производственном коде?

Для моего личного удовлетворения также:

  • почему он вообще поднимается? что еще нужно циклу от coros / tasks: они либо вышли; поднял; или были отменены: разве этого недостаточно, чтобы сделать его счастливым?

Заранее большое спасибо за любые ваши предложения!

Ответы [ 2 ]

0 голосов
/ 12 мая 2018

Пока проблема upstream не будет устранена, еще один способ обойти эту проблему - заменить использование run_in_executor пользовательской версией без изъянов.В то время как сворачивание собственного run_in_executor поначалу звучит как плохая идея, на самом деле это всего лишь небольшой клей между concurrent.futures и asyncio Будущее.

Простая версия run_in_executor может быть чисто реализована с использованием общедоступного API этих двух классов:

def run_in_executor(executor, fn, *args):
    """Submit FN to EXECUTOR and return an asyncio future."""
    loop = asyncio.get_event_loop()
    if args:
        fn = functools.partial(fn, *args)
    work_future = executor.submit(fn)
    aio_future = loop.create_future()
    aio_cancelled = False

    def work_done(_f):
        if not aio_cancelled:
            loop.call_soon_threadsafe(set_result)

    def check_cancel(_f):
        nonlocal aio_cancelled
        if aio_future.cancelled():
            work_future.cancel()
            aio_cancelled = True

    def set_result():
        if work_future.cancelled():
            aio_future.cancel()
        elif work_future.exception() is not None:
            aio_future.set_exception(work_future.exception())
        else:
            aio_future.set_result(work_future.result())

    work_future.add_done_callback(work_done)
    aio_future.add_done_callback(check_cancel)

    return aio_future

Когда loop.run_in_executor(blocking) заменяется на run_in_executor(executor, blocking), executor является ThreadPoolExecutor, созданный в main(), код работает без других модификаций.

Конечно, в этом варианте синхронные функции продолжат выполняться в другом потоке до завершения, несмотря на то, что будут отменены - ноэто неизбежно без изменения их для поддержки явного прерывания.

0 голосов
/ 11 мая 2018

Дистиллированный, мой простой пример использования - это набор блокирующих вызовов "старой школы", которые я заключаю в run_in_executor() и внешнюю сопрограмму;если какой-либо из этих вызовов идет не так, я хочу остановить выполнение, отменить все еще не выполненные вызовы

Это не может работать как предполагалось, потому что run_in_executor передает функцию в пул потоков и потоки ОСне может быть отменено в Python (или в других языках, которые их выставляют).Отмена будущего, возвращенного run_in_executor, попытается отменить базовый concurrent.futures.Future, но это будет иметь эффект, только если функция блокировки еще не запущена, например, потому что пул потоков занят.Как только он начинает выполняться, его нельзя безопасно отменить.Поддержка безопасного и надежного отмены является одним из преимуществ использования asyncio по сравнению с потоками.

Если вы имеете дело с синхронным кодом, будь то устаревший блокирующий вызов или более продолжительный код с привязкой к ЦП, выследует запустить его с run_in_executor и включить способ его прерывания.Например, код может иногда проверять флаг stop_requested и выходить, если это правда, возможно, вызывая исключение.Затем вы можете «отменить» эти задачи, установив соответствующий флаг или флаги.

как мне изменить приведенный выше код таким образом, чтобы при включенном вызове close () не возникало?

Насколько я могу судить, в настоящее время нет способа сделать это без изменений blocking и кода верхнего уровня.run_in_executor будет настаивать на информировании цикла событий о результате, и это завершится неудачно, когда цикл событий будет закрыт.Не помогает, что будущее asyncio отменено, потому что проверка отмены выполняется в потоке цикла событий, и ошибка возникает до этого, когда рабочий поток вызывает call_soon_threadsafe.(Возможно, можно перенести чек в рабочий поток, но следует тщательно проанализировать, приводит ли он к состоянию гонки между вызовом cancel() и фактической проверкой.)

Почемуэто поднять вообще?что еще нужно циклу от coros / tasks: они либо вышли;поднял;или были отменены: разве этого недостаточно, чтобы сохранить его счастливым?

Требуется, чтобы блокирующие функции были переданы в run_in_executor (буквально называемый blocking в вопросе), которые уже начали завершатьсяработает до закрытия цикла событий.Вы отменили будущее asyncio, но основное параллельное будущее все еще хочет «позвонить домой», обнаружив, что цикл замкнут.

Неясно, является ли это ошибкой в ​​asyncio, или вы просто не должнызакрывайте цикл обработки событий, пока не убедитесь, что вся работа, отправленная в run_in_executor, выполнена.Это требует следующих изменений:

  • Не пытайтесь отменить ожидающие фьючерсы.Отмена их выглядит корректно внешне, но не позволяет вам иметь возможность wait() для этих фьючерсов, так как asyncio посчитает их завершенными.
  • Вместо этого отправьте специфичное для приложения событие своим фоновым задачам, сообщая им, что онинеобходимо прервать.
  • Позвонить loop.run_until_complete(asyncio.wait(pending)) до loop.close().

С этими изменениями (за исключением события для конкретного приложения - я просто позволю sleep() s закончить ихконечно), исключение не появилось.

что на самом деле произойдет, если я не позвоню close() - в этом тривиальном случае я предполагаю, что оно в значительной степени избыточно;но каковы могут быть последствия в "реальном" производственном коде?

Поскольку типичный цикл обработки событий работает столько времени, сколько длится приложение, не должно быть проблем с не вызовом close() в самом концепрограммы.Операционная система все равно очистит ресурсы при выходе из программы.

Вызов loop.close() важен для циклов событий с четким временем жизни.Например, библиотека может создать новый цикл событий для конкретной задачи, запустить его в выделенном потоке и избавиться от него.Невозможность закрыть такой цикл может привести к утечке его внутренних ресурсов (например, канала, который он использует для пробуждения между потоками) и вызвать сбой программы.Другим примером являются наборы тестов, которые часто запускают новый цикл событий для каждого модульного теста, чтобы обеспечить разделение тестовых сред.


EDIT: I подал ошибку для этой проблемы.
РЕДАКТИРОВАТЬ 2: Ошибка была исправлена ​​ разработчиками.
...