Я создаю Flask приложение, которое отвечает на определенные запросы, начиная новый поток для выполнения работы. Затем этот новый поток создает multiprocessing.Pool
рабочих для параллельной обработки нескольких несвязанных входных данных (выходные данные одного процесса не влияют на другие процессы).
Мне нужен способ прервать эту работу и полностью завершить рабочие и multiprocessing.Pool
. Просматривая документацию, я не смог определить, как остановить рабочих или прекратить пул.
Вот минимальный рабочий пример, демонстрирующий проблему.
Это файл с рабочим class.
# sub.py
import logging
import multiprocessing
import psutil
import threading
import time
LOGGER = logging.getLogger(__name__)
N_CORES = psutil.cpu_count(logical=False)
class WorkerClass():
def __init__(self, input_values):
self.input_values = input_values
self.thread = threading.Thread(target=self.run, args=())
def run(self):
LOGGER.debug(f'Starting pool of {N_CORES} processes')
with multiprocessing.Pool(processes=N_CORES) as pool:
self.pool = pool
jobs = {}
for i in self.input_values:
jobs[i] = pool.apply_async(self.worker, [i])
LOGGER.debug(f'{len(jobs)} workers started')
done = []
while len(done) < len(jobs):
for key in jobs:
if key not in done:
try:
res = jobs[key].get(1)
LOGGER.debug(f'{key} result: {res}')
done.append(key)
except multiprocessing.TimeoutError:
LOGGER.debug(f'Job still running: {key}')
LOGGER.debug('Collected all worker jobs.')
LOGGER.debug('Multiprocessing pool closed.')
@staticmethod
def worker(i):
time.sleep(5) # some long process
result = i**2
LOGGER.info(f'{i} * {i} = {result}')
return result
А вот и основное приложение.
# main.py
import logging
import time
from sub import WorkerClass
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger()
if __name__ == '__main__':
my_worker = WorkerClass(range(5))
LOGGER.debug('starting thread')
my_worker.thread.start()
# simulate the request to terminate the job
time.sleep(11)
my_worker.pool.terminate()
my_worker.pool.close()
Если я закомментирую часть main.py
, которая завершает пул, все потоки заканчиваются sh и больше не отображается в диспетчере задач.
Когда я включаю часть для завершения multiprocess.Pool
, программа зависает с постоянными задачами Python в диспетчере задач и следующим выводом:
$ python main.py
DEBUG:root:starting thread
DEBUG:slave:Starting pool of 2 processes
DEBUG:slave:5 workers started
DEBUG:slave:Job still running: 0
DEBUG:slave:Job still running: 1
DEBUG:slave:Job still running: 2
DEBUG:slave:Job still running: 3
DEBUG:slave:Job still running: 4
INFO:slave:0 * 0 = 0
DEBUG:slave:0 result: 0
INFO:slave:1 * 1 = 1
DEBUG:slave:1 result: 1
DEBUG:slave:Job still running: 2
DEBUG:slave:Job still running: 3
DEBUG:slave:Job still running: 4
DEBUG:slave:Job still running: 2
INFO:slave:2 * 2 = 4
INFO:slave:3 * 3 = 9
DEBUG:slave:3 result: 9
DEBUG:slave:Job still running: 4
DEBUG:slave:2 result: 4
DEBUG:slave:Job still running: 4
DEBUG:slave:Job still running: 4
DEBUG:slave:Job still running: 4
DEBUG:slave:Job still running: 4
...
<forever>
Как я могу чисто остановить рабочих и прекратить multiprocessing.Pool
?
~~~ Я сдался ~~~
Вместо того, чтобы использовать pool.terminate()
, я вместо этого просто прошел multiprocessing.mangager.Event()
на рабочую задачу, чтобы сигнализировать, когда остановиться. Вот исправленный код, который завершается, когда я выполняю clear()
на Event
.
# main.py
import logging
import time
from sub import WorkerClass
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger()
if __name__ == '__main__':
my_worker = WorkerClass(range(5))
LOGGER.debug('starting thread')
my_worker.thread.start()
# simulate the request to terminate the job
time.sleep(7)
my_worker.proceed.clear() # clear the event flag
# sub.py
import logging
import multiprocessing
import psutil
import threading
import time
LOGGER = logging.getLogger(__name__)
N_CORES = psutil.cpu_count(logical=False)
class WorkerClass():
def __init__(self, input_values):
self.input_values = input_values
self.thread = threading.Thread(target=self.run, args=())
def run(self):
LOGGER.debug(f'Starting pool of {N_CORES} processes')
manager = multiprocessing.Manager()
self.proceed = manager.Event() # create an event flag
self.proceed.set()
with multiprocessing.Pool(processes=N_CORES) as pool:
jobs = {}
for i in self.input_values:
jobs[i] = pool.apply_async(self.worker, [i, self.proceed])
LOGGER.debug(f'{len(jobs)} workers started')
done = []
while len(done) < len(jobs):
for key in jobs:
if key not in done:
try:
res = jobs[key].get(1)
LOGGER.debug(f'{key} result: {res}')
done.append(key)
except multiprocessing.TimeoutError:
LOGGER.debug(f'Job still running: {key}')
LOGGER.debug('Collected all worker jobs.')
LOGGER.debug('Multiprocessing pool closed.')
@staticmethod
def worker(i, proceed):
result = 0
while result < i**2 and proceed.is_set(): # check that the event is still set
time.sleep(2) # some long process
result += i
LOGGER.info(f'{i} * {i} = {result}')
return result