Трудно сказать, что происходит в вашем алгоритме. Я мало знаю о многопроцессорности, но, вероятно, безопаснее придерживаться функций и не передавать себя в объединенные процессы. Это делается, когда вы передаете args_tmp
в wrapper_
в pool.map()
. Также в целом постарайтесь уменьшить объем данных, передаваемых между родительским и дочерним процессами в целом. Я пытаюсь переместить генерацию списка lp
в рабочие пула, чтобы предотвратить передачу лишних данных.
Наконец, хотя я не думаю, что это имеет значение в этом примере кода, но вы должны очистить его после используя пул или пул с with
.
Я переписал часть вашего кода, чтобы попробовать, и это кажется быстрее, но я не на 100%, что он соответствует вашему алгоритму. Некоторые имена переменных трудно различить guish.
Это работает намного быстрее для меня, но трудно сказать, точно ли оно создает ваши решения. Мой окончательный вывод, если это верно, заключается в том, что дополнительная передача данных значительно замедляла рабочих пула.
#main.py
if __name__ == '__main__':
import os
import sys
file_dir = os.path.dirname(__file__)
sys.path.append(file_dir)
from tmp import generate_1
parallel = True
generate_1(parallel)
#tmp.py
import multiprocessing as mp
import numpy as np
import random
from tqdm import tqdm
from itertools import starmap
def wrapper_(arg):
return arg['self'].generate_array_elements(
nu1=arg['nu1'],
nu2=arg['nu2'],
ii=arg['ii'],
jj=arg['jj'],
lp=arg['self'].lp,
nu1exp=arg['nu1exp'],
nu2exp=arg['nu2exp'],
innt=arg['innt']
)
def generate_1(parallel):
"""create a list that is used to the creation of 2-D array"""
il = np.random.random(256)
"""generating params for parallel data generation"""
"""some params are also calculated here to speed up the calculation process
because they are always the same so they can be calculated just once"""
"""this code creates a list of 256*256 elements"""
args_tmp = [
{
'nu1': nu1,
'nu2': nu2,
'ii': ii,
'jj': jj,
'innt': np.random.random()*nu1+np.random.random()*nu2,
'nu1exp': np.exp(1j*nu1),
'nu2exp': np.exp(1j*nu2),
} for ii, nu1 in enumerate(il) for jj, nu2 in enumerate(il)]
"""init pool"""
"""get list of arrays to generate"""
ip_list = [random.sample((range(256)),int(np.random.random()*12)+1) for ii in range(300)]
map_args = [(idx, ip, args_tmp) for idx, ip in enumerate(ip_list)]
"""separate function to do other important things"""
if parallel:
with mp.Pool(8, maxtasksperchild=10000) as pool:
result = pool.starmap(start_generate_2, map_args)
else:
result = starmap(start_generate_2, map_args)
# Wrap iterator in list call.
return list(result)
def start_generate_2(idx, ip, args_tmp):
print ('starting {idx}'.format(idx=idx))
runner = Runner()
result = runner.generate_2(ip, args_tmp)
print ('finished {idx}'.format(idx=idx))
return result
class Runner():
def generate_2(self, ip, args_tmp):
"""NOTE, the method is much more extensive and uses other methods of the class"""
"""so it must remain a method of the class that is not static!"""
self.lp = [{'a': np.random.random(), 'b': np.random.random()} for ii in ip]
"""this part creates 1-D array of the length of args_tmp, that's 256*256"""
result = map(wrapper_, [dict(args, self=self) for args in args_tmp])
"""it's then reshaped to 2-D array"""
result = np.reshape(list(result), (256,256))
return result
def generate_array_elements(self, nu1, nu2, ii, jj, lp, nu1exp, nu2exp, innt):
"""doing heavy calc"""
""""here is something else"""
if ii > jj: return 0
ll1 = []
ll2 = []
for kk, ll in enumerate(lp):
ll1.append(nu1*nu2*nu1exp**ll['a']*np.exp(1j*np.random.random()))
ll2.append(nu1*nu2*nu2exp**ll['b']*np.exp(1j*np.random.random()))
t1 = sum(ll1)
t2 = sum(ll2)
result = innt*np.abs(t1 - t2)
return result
Я добавляю общий шаблон c, чтобы показать архитектуру, в которой вы отделить подготовку общих аргументов от средства выполнения задач и по-прежнему использовать классы. Стратегия здесь заключалась бы в том, чтобы не создавать слишком много задач (300 кажется быстрее, чем пытаться разделить их до 64000) и не передавать слишком много данных каждой задаче. Интерфейс launch_task должен быть как можно более простым, что в моем рефакторинге вашего кода было бы эквивалентно start_generate_2
.
import multiprocessing
from itertools import starmap
class Launcher():
def __init__(self, parallel):
self.parallel = parallel
def generate_shared_args(self):
return [(i, j) for i, j in enumerate(range(300))]
def launch(self):
shared_args = self.generate_shared_args()
if self.parallel:
with multiprocessing.Pool(8) as pool:
result = pool.starmap(launch_task, shared_args)
else:
result = starmap(launch_task, shared_args)
# Wrap in list to resolve iterable.
return list(result)
def launch_task(i, j):
task = Task(i, j)
return task.run()
class Task():
def __init__(self, i, j):
self.i = i
self.j = j
def run(self):
return self.i + self.j
if __name__ == '__main__':
parallel = True
launcher = Launcher(parallel)
print(launcher.launch())
Здесь есть предупреждение об очистке пула в документации по пулу. : https://docs.python.org/3/library/multiprocessing.html#multiprocessing .pool.Pool
В первом пункте обсуждается отказ от общего состояния и особенно больших объемов данных. https://docs.python.org/3/library/multiprocessing.html#programming - рекомендации