Как прекратить такие длительные вычисления с привязкой к ЦП в методе?
Подход, который вы пробовали, не работает, потому что фьючерсы возвращаются ProcessPoolExecutor
не подлежат отмене.Хотя run_in_executor
Asyncio пытается распространить отмену, просто игнорируется на Future.cancel
, как только задача начинает выполняться.
Нет фундаментальной причины для этого,В отличие от потоков, процессы могут быть безопасно завершены, поэтому для ProcessPoolExecutor.submit
будет вполне возможно вернуть будущее, чье cancel
завершит соответствующий процесс.Сопрограммы Asyncio определили семантику отмены и будут автоматически использовать ее.К сожалению, ProcessPoolExecutor.submit
возвращает обычный concurrent.futures.Future
, который принимает наименьший общий знаменатель и рассматривает текущее будущее как неприкасаемое.
В результате для отмены задач, выполняемых в подпроцессах, одиндолжен обойти ProcessPoolExecutor
в целом и управлять своими собственными процессами.Задача состоит в том, как сделать это без переопределения половины multiprocessing
.Одна из опций, предлагаемых стандартной библиотекой, состоит в том, чтобы (ab) использовать для этой цели multiprocessing.Pool
, поскольку она поддерживает надежное завершение рабочих процессов.A CancellablePool
может работать следующим образом:
- Вместо того, чтобы порождать фиксированное число процессов, порождают фиксированное число пулов из 1 рабочего.
- Назначение задач для пулов из асинхронного режимасопрограммная.Если сопрограмма отменяется в ожидании завершения задачи в другом процессе, завершает пул с одним процессом и создает новый.
- Поскольку все координируется из одного асинхронного потокане беспокойтесь о таких условиях гонки, как случайное завершение процесса, который уже начал выполнять другую задачу.(Это должно быть предотвращено, если кто-то поддержит отмену в
ProcessPoolExecutor
.)
Вот пример реализации этой идеи:
import asyncio
import multiprocessing
class CancellablePool:
def __init__(self, max_workers=3):
self._free = {self._new_pool() for _ in range(max_workers)}
self._working = set()
self._change = asyncio.Event()
def _new_pool(self):
return multiprocessing.Pool(1)
async def apply(self, fn, *args):
"""
Like multiprocessing.Pool.apply_async, but:
* is an asyncio coroutine
* terminates the process if cancelled
"""
while not self._free:
await self._change.wait()
self._change.clear()
pool = usable_pool = self._free.pop()
self._working.add(pool)
loop = asyncio.get_event_loop()
fut = loop.create_future()
def _on_done(obj):
loop.call_soon_threadsafe(fut.set_result, obj)
def _on_err(err):
loop.call_soon_threadsafe(fut.set_exception, err)
pool.apply_async(fn, args, callback=_on_done, error_callback=_on_err)
try:
return await fut
except asyncio.CancelledError:
pool.terminate()
usable_pool = self._new_pool()
finally:
self._working.remove(pool)
self._free.add(usable_pool)
self._change.set()
def shutdown(self):
for p in self._working | self._free:
p.terminate()
self._free.clear()
Минималистичный тестовый примерс отображением отмены:
def really_long_process():
print("I am a really long computation.....")
large_val = 9729379273492397293479237492734 ** 344323
print("I finally computed this large value: {}".format(large_val))
async def main():
loop = asyncio.get_event_loop()
pool = CancellablePool()
tasks = [loop.create_task(pool.apply(really_long_process))
for _ in range(5)]
for t in tasks:
try:
await asyncio.wait_for(t, 1)
except asyncio.TimeoutError:
print('task timed out and cancelled')
pool.shutdown()
asyncio.get_event_loop().run_until_complete(main())
Обратите внимание, что загрузка ЦП никогда не превышает 3 ядра, и как он начинает падать ближе к концу теста, указывая, что процессы завершаются, как ожидалось.
Чтобы применить его к коду из вопроса, сделайте self._lmz_executor
экземпляр CancellablePool
и измените self._loop.run_in_executor(...)
на self._loop.create_task(self._lmz_executor.apply(...))
.