Как правильно прервать и прекратить многопроцессорную работу. Пул заданий - PullRequest
0 голосов
/ 02 апреля 2020

Я создаю Flask приложение, которое отвечает на определенные запросы, начиная новый поток для выполнения работы. Затем этот новый поток создает multiprocessing.Pool рабочих для параллельной обработки нескольких несвязанных входных данных (выходные данные одного процесса не влияют на другие процессы).

Мне нужен способ прервать эту работу и полностью завершить рабочие и multiprocessing.Pool. Просматривая документацию, я не смог определить, как остановить рабочих или прекратить пул.

Вот минимальный рабочий пример, демонстрирующий проблему.

Это файл с рабочим class.

# sub.py
import logging
import multiprocessing
import psutil
import threading
import time

LOGGER = logging.getLogger(__name__)
N_CORES = psutil.cpu_count(logical=False)


class WorkerClass():
    def __init__(self, input_values):
        self.input_values = input_values
        self.thread = threading.Thread(target=self.run, args=())

    def run(self):
        LOGGER.debug(f'Starting pool of {N_CORES} processes')
        with multiprocessing.Pool(processes=N_CORES) as pool:
            self.pool = pool
            jobs = {}
            for i in self.input_values:
                jobs[i] = pool.apply_async(self.worker, [i])

            LOGGER.debug(f'{len(jobs)} workers started')

            done = []
            while len(done) < len(jobs):
                for key in jobs:
                    if key not in done:
                        try:
                            res = jobs[key].get(1)
                            LOGGER.debug(f'{key} result: {res}')
                            done.append(key)
                        except multiprocessing.TimeoutError:
                            LOGGER.debug(f'Job still running: {key}')
            LOGGER.debug('Collected all worker jobs.')
        LOGGER.debug('Multiprocessing pool closed.')

    @staticmethod
    def worker(i):
        time.sleep(5)  # some long process
        result = i**2
        LOGGER.info(f'{i} * {i} = {result}')
        return result

А вот и основное приложение.

# main.py
import logging
import time

from sub import WorkerClass

logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger()


if __name__ == '__main__':
    my_worker = WorkerClass(range(5))
    LOGGER.debug('starting thread')
    my_worker.thread.start()

    # simulate the request to terminate the job
    time.sleep(11)
    my_worker.pool.terminate()
    my_worker.pool.close()

Если я закомментирую часть main.py, которая завершает пул, все потоки заканчиваются sh и больше не отображается в диспетчере задач.

Когда я включаю часть для завершения multiprocess.Pool, программа зависает с постоянными задачами Python в диспетчере задач и следующим выводом:

$ python main.py
DEBUG:root:starting thread
DEBUG:slave:Starting pool of 2 processes
DEBUG:slave:5 workers started
DEBUG:slave:Job still running: 0
DEBUG:slave:Job still running: 1
DEBUG:slave:Job still running: 2
DEBUG:slave:Job still running: 3
DEBUG:slave:Job still running: 4
INFO:slave:0 * 0 = 0
DEBUG:slave:0 result: 0
INFO:slave:1 * 1 = 1
DEBUG:slave:1 result: 1
DEBUG:slave:Job still running: 2
DEBUG:slave:Job still running: 3
DEBUG:slave:Job still running: 4
DEBUG:slave:Job still running: 2
INFO:slave:2 * 2 = 4
INFO:slave:3 * 3 = 9
DEBUG:slave:3 result: 9
DEBUG:slave:Job still running: 4
DEBUG:slave:2 result: 4
DEBUG:slave:Job still running: 4
DEBUG:slave:Job still running: 4
DEBUG:slave:Job still running: 4
DEBUG:slave:Job still running: 4
... 
<forever>

Как я могу чисто остановить рабочих и прекратить multiprocessing.Pool?

~~~ Я сдался ~~~

Вместо того, чтобы использовать pool.terminate(), я вместо этого просто прошел multiprocessing.mangager.Event() на рабочую задачу, чтобы сигнализировать, когда остановиться. Вот исправленный код, который завершается, когда я выполняю clear() на Event.

# main.py
import logging
import time

from sub import WorkerClass

logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger()


if __name__ == '__main__':
    my_worker = WorkerClass(range(5))
    LOGGER.debug('starting thread')
    my_worker.thread.start()

    # simulate the request to terminate the job
    time.sleep(7)
    my_worker.proceed.clear()  # clear the event flag
# sub.py
import logging
import multiprocessing
import psutil
import threading
import time

LOGGER = logging.getLogger(__name__)
N_CORES = psutil.cpu_count(logical=False)


class WorkerClass():
    def __init__(self, input_values):
        self.input_values = input_values
        self.thread = threading.Thread(target=self.run, args=())

    def run(self):
        LOGGER.debug(f'Starting pool of {N_CORES} processes')
        manager = multiprocessing.Manager()
        self.proceed = manager.Event()  # create an event flag
        self.proceed.set()
        with multiprocessing.Pool(processes=N_CORES) as pool:
            jobs = {}
            for i in self.input_values:
                jobs[i] = pool.apply_async(self.worker, [i, self.proceed])

            LOGGER.debug(f'{len(jobs)} workers started')

            done = []
            while len(done) < len(jobs):
                for key in jobs:
                    if key not in done:
                        try:
                            res = jobs[key].get(1)
                            LOGGER.debug(f'{key} result: {res}')
                            done.append(key)
                        except multiprocessing.TimeoutError:
                            LOGGER.debug(f'Job still running: {key}')
            LOGGER.debug('Collected all worker jobs.')
        LOGGER.debug('Multiprocessing pool closed.')

    @staticmethod
    def worker(i, proceed):
        result = 0
        while result < i**2 and proceed.is_set():  # check that the event is still set
            time.sleep(2)  # some long process
            result += i
        LOGGER.info(f'{i} * {i} = {result}')
        return result
...