Вдохновленный этим решением Я пытаюсь настроить многопроцессорный пул рабочих процессов в Python.Идея состоит в том, чтобы передать некоторые данные рабочим процессам до того, как они действительно начнут свою работу, и в конце концов повторно использовать их.Он предназначен для минимизации объема данных, которые необходимо упаковать / распаковать для каждого вызова в рабочий процесс (т. Е. Снизить накладные расходы на межпроцессное взаимодействие).Мой MCVE выглядит следующим образом:
import multiprocessing as mp
import numpy as np
def create_worker_context():
global context # create "global" context in worker process
context = {}
def init_worker_context(worker_id, some_const_array, DIMS, DTYPE):
context.update({
'worker_id': worker_id,
'some_const_array': some_const_array,
'tmp': np.zeros((DIMS, DIMS), dtype = DTYPE),
}) # store context information in global namespace of worker
return True # return True, verifying that the worker process received its data
class data_analysis:
def __init__(self):
self.DTYPE = 'float32'
self.CPU_LEN = mp.cpu_count()
self.DIMS = 100
self.some_const_array = np.zeros((self.DIMS, self.DIMS), dtype = self.DTYPE)
# Init multiprocessing pool
self.cpu_pool = mp.Pool(processes = self.CPU_LEN, initializer = create_worker_context) # create pool and context in workers
pool_results = [
self.cpu_pool.apply_async(
init_worker_context,
args = (core_id, self.some_const_array, self.DIMS, self.DTYPE)
) for core_id in range(self.CPU_LEN)
] # pass information to workers' context
result_batches = [result.get() for result in pool_results] # check if they got the information
if not all(result_batches): # raise an error if things did not work
raise SyntaxError('Workers could not be initialized ...')
@staticmethod
def process_batch(batch_data):
context['tmp'][:,:] = context['some_const_array'] + batch_data # some fancy computation in worker
return context['tmp'] # return result
def process_all(self):
input_data = np.arange(0, self.DIMS ** 2, dtype = self.DTYPE).reshape(self.DIMS, self.DIMS)
pool_results = [
self.cpu_pool.apply_async(
data_analysis.process_batch,
args = (input_data,)
) for _ in range(self.CPU_LEN)
] # let workers actually work
result_batches = [result.get() for result in pool_results]
for batch in result_batches[1:]:
np.add(result_batches[0], batch, out = result_batches[0]) # reduce batches
print(result_batches[0]) # show result
if __name__ == '__main__':
data_analysis().process_all()
Я запускаю вышеупомянутое с CPython 3.6.6.
Странная вещь ... иногда это работает, иногда нет.Если это не работает, метод process_batch
выдает исключение, потому что он не может найти some_const_array
в качестве ключа в context
.Полный возврат выглядит следующим образом:
(env) me@box:/path> python so.py
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/path/so.py", line 37, in process_batch
context['tmp'][:,:] = context['some_const_array'] + batch_data # some fancy computation in worker
KeyError: 'some_const_array'
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/path/so.py", line 54, in <module>
data_analysis().process_all()
File "/path/so.py", line 48, in process_all
result_batches = [result.get() for result in pool_results]
File "/path/so.py", line 48, in <listcomp>
result_batches = [result.get() for result in pool_results]
File "/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
KeyError: 'some_const_array'
Я озадачен.Что здесь происходит?
Если в моих словарях context
содержится объект "более высокого типа", например, драйвер базы данных или аналогичный, я не получаю такой проблемы.Я могу воспроизвести это, только если мои context
словари содержат базовые типы данных Python, коллекции или массивы.
(Существует ли потенциально лучший подход для достижения того же результата более надежным способом? Я знаю, что мой подход считается взломать ...)