Дистиллированный, мой простой пример использования - это набор блокирующих вызовов "старой школы", которые я заключаю в run_in_executor()
и внешнюю сопрограмму;если какой-либо из этих вызовов идет не так, я хочу остановить выполнение, отменить все еще не выполненные вызовы
Это не может работать как предполагалось, потому что run_in_executor
передает функцию в пул потоков и потоки ОСне может быть отменено в Python (или в других языках, которые их выставляют).Отмена будущего, возвращенного run_in_executor
, попытается отменить базовый concurrent.futures.Future
, но это будет иметь эффект, только если функция блокировки еще не запущена, например, потому что пул потоков занят.Как только он начинает выполняться, его нельзя безопасно отменить.Поддержка безопасного и надежного отмены является одним из преимуществ использования asyncio
по сравнению с потоками.
Если вы имеете дело с синхронным кодом, будь то устаревший блокирующий вызов или более продолжительный код с привязкой к ЦП, выследует запустить его с run_in_executor
и включить способ его прерывания.Например, код может иногда проверять флаг stop_requested
и выходить, если это правда, возможно, вызывая исключение.Затем вы можете «отменить» эти задачи, установив соответствующий флаг или флаги.
как мне изменить приведенный выше код таким образом, чтобы при включенном вызове close () не возникало?
Насколько я могу судить, в настоящее время нет способа сделать это без изменений blocking
и кода верхнего уровня.run_in_executor
будет настаивать на информировании цикла событий о результате, и это завершится неудачно, когда цикл событий будет закрыт.Не помогает, что будущее asyncio отменено, потому что проверка отмены выполняется в потоке цикла событий, и ошибка возникает до этого, когда рабочий поток вызывает call_soon_threadsafe
.(Возможно, можно перенести чек в рабочий поток, но следует тщательно проанализировать, приводит ли он к состоянию гонки между вызовом cancel()
и фактической проверкой.)
Почемуэто поднять вообще?что еще нужно циклу от coros / tasks: они либо вышли;поднял;или были отменены: разве этого недостаточно, чтобы сохранить его счастливым?
Требуется, чтобы блокирующие функции были переданы в run_in_executor
(буквально называемый blocking
в вопросе), которые уже начали завершатьсяработает до закрытия цикла событий.Вы отменили будущее asyncio, но основное параллельное будущее все еще хочет «позвонить домой», обнаружив, что цикл замкнут.
Неясно, является ли это ошибкой в asyncio, или вы просто не должнызакрывайте цикл обработки событий, пока не убедитесь, что вся работа, отправленная в run_in_executor
, выполнена.Это требует следующих изменений:
- Не пытайтесь отменить ожидающие фьючерсы.Отмена их выглядит корректно внешне, но не позволяет вам иметь возможность
wait()
для этих фьючерсов, так как asyncio посчитает их завершенными. - Вместо этого отправьте специфичное для приложения событие своим фоновым задачам, сообщая им, что онинеобходимо прервать.
- Позвонить
loop.run_until_complete(asyncio.wait(pending))
до loop.close()
.
С этими изменениями (за исключением события для конкретного приложения - я просто позволю sleep()
s закончить ихконечно), исключение не появилось.
что на самом деле произойдет, если я не позвоню close()
- в этом тривиальном случае я предполагаю, что оно в значительной степени избыточно;но каковы могут быть последствия в "реальном" производственном коде?
Поскольку типичный цикл обработки событий работает столько времени, сколько длится приложение, не должно быть проблем с не вызовом close()
в самом концепрограммы.Операционная система все равно очистит ресурсы при выходе из программы.
Вызов loop.close()
важен для циклов событий с четким временем жизни.Например, библиотека может создать новый цикл событий для конкретной задачи, запустить его в выделенном потоке и избавиться от него.Невозможность закрыть такой цикл может привести к утечке его внутренних ресурсов (например, канала, который он использует для пробуждения между потоками) и вызвать сбой программы.Другим примером являются наборы тестов, которые часто запускают новый цикл событий для каждого модульного теста, чтобы обеспечить разделение тестовых сред.
EDIT: I
подал ошибку для этой проблемы.
РЕДАКТИРОВАТЬ 2: Ошибка была
исправлена разработчиками.