Как прекратить длительные вычисления (задача с привязкой к процессору) в Python, используя asyncio и concurrent.futures.ProcessPoolExecutor? - PullRequest
0 голосов
/ 22 октября 2018

Аналогичный вопрос (но у меня ответ не работает): Как отменить длительные подпроцессы, запущенные с использованием concurrent.futures.ProcessPoolExecutor?

В отличие отвопрос, связанный выше, и предоставленное решение, в моем случае само вычисление довольно длинное (с привязкой к ЦП) и не может быть выполнено в цикле, чтобы проверить, произошло ли какое-либо событие.

Сокращенная версия кода ниже:

import asyncio
import concurrent.futures as futures
import time

class Simulator:
    def __init__(self):
        self._loop = None
        self._lmz_executor = None
        self._tasks = []
        self._max_execution_time = time.monotonic() + 60
        self._long_running_tasks = []

    def initialise(self):
        # Initialise the main asyncio loop
        self._loop = asyncio.get_event_loop()
        self._loop.set_default_executor(
            futures.ThreadPoolExecutor(max_workers=3))

        # Run separate processes of long computation task
        self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3)

    def run(self):
        self._tasks.extend(
            [self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]]
        )

        try:
            # Gather bot reasoner tasks
            _reasoner_tasks = asyncio.gather(*self._tasks)
            # Send the reasoner tasks to main monitor task
            asyncio.gather(self.sample_main_loop(_reasoner_tasks))
            self._loop.run_forever()
        except KeyboardInterrupt:
            pass
        finally:
            self._loop.close()

    async def sample_main_loop(self, reasoner_tasks):
        """This is the main monitor task"""
        await asyncio.wait_for(reasoner_tasks, None)
        for task in self._long_running_tasks:
            try:
                await asyncio.wait_for(task, 10)
            except asyncio.TimeoutError:
                print("Oops. Some long operation timed out.")
                task.cancel()  # Doesn't cancel and has no effect
                task.set_result(None)  # Doesn't seem to have an effect

        self._lmz_executor.shutdown()
        self._loop.stop()
        print('And now I am done. Yay!')

    async def bot_reasoning_loop(self, bot):
        import math

        _exec_count = 0
        _sleepy_time = 15
        _max_runs = math.floor(self._max_execution_time / _sleepy_time)

        self._long_running_tasks.append(
            self._loop.run_in_executor(
                    self._lmz_executor, really_long_process, _sleepy_time))

        while time.monotonic() < self._max_execution_time:
            print("Bot#{}: thinking for {}s. Run {}/{}".format(
                    bot, _sleepy_time, _exec_count, _max_runs))
            await asyncio.sleep(_sleepy_time)
            _exec_count += 1

        print("Bot#{} Finished Thinking".format(bot))

def really_long_process(sleepy_time):
    print("I am a really long computation.....")
    _large_val = 9729379273492397293479237492734 ** 344323
    print("I finally computed this large value: {}".format(_large_val))

if __name__ == "__main__":
    sim = Simulator()
    sim.initialise()
    sim.run()

Идея состоит в том, что есть основной цикл симуляции, который запускает и контролирует три потока бота.Каждый из этих потоков ботов затем выполняет некоторые рассуждения, но также запускает очень длительный фоновый процесс, используя ProcessPoolExecutor, который может закончиться тем, что он будет работать дольше своего собственного порога / максимального времени выполнения для рассуждения о вещах.

Как вы можете видетьв приведенном выше коде я попытался .cancel() выполнить эти задачи, когда истекло время ожидания.Хотя это на самом деле не отменяет фактическое вычисление, которое происходит в фоновом режиме, и цикл asyncio не прерывается до тех пор, пока не завершатся все длительные вычисления.

Как завершить такой долго работающий ЦПвычисления внутри метода?

Другие подобные вопросы SO, но не обязательно связанные или полезные:

  1. asyncio: возможно ли отменить будущеевыполняется исполнителем?
  2. Как завершить одиночную асинхронную задачу в многопроцессорной обработке, если эта одиночная асинхронная задача превышает пороговое время в Python
  3. Асинхронныймногопроцессорная обработка с рабочим пулом в Python: как продолжать работу после истечения времени ожидания?

1 Ответ

0 голосов
/ 23 октября 2018

Как прекратить такие длительные вычисления с привязкой к ЦП в методе?

Подход, который вы пробовали, не работает, потому что фьючерсы возвращаются ProcessPoolExecutor не подлежат отмене.Хотя run_in_executor Asyncio пытается распространить отмену, просто игнорируется на Future.cancel, как только задача начинает выполняться.

Нет фундаментальной причины для этого,В отличие от потоков, процессы могут быть безопасно завершены, поэтому для ProcessPoolExecutor.submit будет вполне возможно вернуть будущее, чье cancel завершит соответствующий процесс.Сопрограммы Asyncio определили семантику отмены и будут автоматически использовать ее.К сожалению, ProcessPoolExecutor.submit возвращает обычный concurrent.futures.Future, который принимает наименьший общий знаменатель и рассматривает текущее будущее как неприкасаемое.

В результате для отмены задач, выполняемых в подпроцессах, одиндолжен обойти ProcessPoolExecutor в целом и управлять своими собственными процессами.Задача состоит в том, как сделать это без переопределения половины multiprocessing.Одна из опций, предлагаемых стандартной библиотекой, состоит в том, чтобы (ab) использовать для этой цели multiprocessing.Pool, поскольку она поддерживает надежное завершение рабочих процессов.A CancellablePool может работать следующим образом:

  • Вместо того, чтобы порождать фиксированное число процессов, порождают фиксированное число пулов из 1 рабочего.
  • Назначение задач для пулов из асинхронного режимасопрограммная.Если сопрограмма отменяется в ожидании завершения задачи в другом процессе, завершает пул с одним процессом и создает новый.
  • Поскольку все координируется из одного асинхронного потокане беспокойтесь о таких условиях гонки, как случайное завершение процесса, который уже начал выполнять другую задачу.(Это должно быть предотвращено, если кто-то поддержит отмену в ProcessPoolExecutor.)

Вот пример реализации этой идеи:

import asyncio
import multiprocessing

class CancellablePool:
    def __init__(self, max_workers=3):
        self._free = {self._new_pool() for _ in range(max_workers)}
        self._working = set()
        self._change = asyncio.Event()

    def _new_pool(self):
        return multiprocessing.Pool(1)

    async def apply(self, fn, *args):
        """
        Like multiprocessing.Pool.apply_async, but:
         * is an asyncio coroutine
         * terminates the process if cancelled
        """
        while not self._free:
            await self._change.wait()
            self._change.clear()
        pool = usable_pool = self._free.pop()
        self._working.add(pool)

        loop = asyncio.get_event_loop()
        fut = loop.create_future()
        def _on_done(obj):
            loop.call_soon_threadsafe(fut.set_result, obj)
        def _on_err(err):
            loop.call_soon_threadsafe(fut.set_exception, err)
        pool.apply_async(fn, args, callback=_on_done, error_callback=_on_err)

        try:
            return await fut
        except asyncio.CancelledError:
            pool.terminate()
            usable_pool = self._new_pool()
        finally:
            self._working.remove(pool)
            self._free.add(usable_pool)
            self._change.set()

    def shutdown(self):
        for p in self._working | self._free:
            p.terminate()
        self._free.clear()

Минималистичный тестовый примерс отображением отмены:

def really_long_process():
    print("I am a really long computation.....")
    large_val = 9729379273492397293479237492734 ** 344323
    print("I finally computed this large value: {}".format(large_val))

async def main():
    loop = asyncio.get_event_loop()
    pool = CancellablePool()

    tasks = [loop.create_task(pool.apply(really_long_process))
             for _ in range(5)]
    for t in tasks:
        try:
            await asyncio.wait_for(t, 1)
        except asyncio.TimeoutError:
            print('task timed out and cancelled')
    pool.shutdown()

asyncio.get_event_loop().run_until_complete(main())

Обратите внимание, что загрузка ЦП никогда не превышает 3 ядра, и как он начинает падать ближе к концу теста, указывая, что процессы завершаются, как ожидалось.

Чтобы применить его к коду из вопроса, сделайте self._lmz_executor экземпляр CancellablePool и измените self._loop.run_in_executor(...) на self._loop.create_task(self._lmz_executor.apply(...)).

...