Мне нужно выполнить ~ 18000 довольно дорогих вычислений на суперкомпьютере, и я пытаюсь выяснить, как распараллелить код.В основном он работал с multiprocessing.Process , но он зависал на шаге .join (), если я выполнил более ~ 350 вычислений.
Один из компьютерных ученых, управляющих суперкомпьютером, рекомендовал использовать multiprocessing.Pool вместо Process.
При использовании Process я бы настроил Выходную очередь и список процессов, затем запустил и присоединился к таким процессам:
output = mp.Queue()
processes = [mp.Process(target=some_function,args=(x,output)) for x in some_array]
for p in processes:
p.start()
for p in processes:
p.join()
Поскольку processes
- это список, онявляется итеративным, и я могу использовать output.get()
внутри списка, чтобы получить все результаты:
result = [output.get() for p in processes]
Что эквивалентно этому при использовании пула?Если пул не повторяется, как я могу получить выходные данные каждого процесса, который находится внутри него?
Вот моя попытка с фиктивными данными и фиктивными расчет:
import pandas as pd
import multiprocessing as mp
##dummy function
def predict(row,output):
calc = [len(row.c1)**2,len(row.c2)**2]
output.put([row.c1+' - '+row.c2,sum(calc)])
#dummy data
c = pd.DataFrame(data=[['a','bb'],['ccc','dddd'],['ee','fff'],['gg','hhhh'],['i','jjj']],columns=['c1','c2'])
if __name__ == '__main__':
#output queue
print('initializing output container...')
output = mp.Manager().Queue()
#pool of processes
print('initializing and storing calculations...')
pool = mp.Pool(processes=5)
for i,row in c.iterrows(): #try some smaller subsets here
pool.apply_async(predict,args=(row,output))
#run processes and keep a counter-->I'm not sure what replaces this with Pool!
#for p in processes:
# p.start()
##exit completed processes-->or this!
#for p in processes:
# p.join()
#pool.close() #is this right?
#pool.join() #this?
#store each calculation
print('storing output of calculations...')
p = pd.DataFrame([output.get() for p in pool]) ## <-- this is where the code breaks because pool is not iterable
print(p)
Вывод, который я получаю:
initializing output container...
initializing and storing calculations...
storing output of calculations...
Traceback (most recent call last):
File "parallel_test.py", line 37, in <module>
p = pd.DataFrame([output.get() for p in pool]) ## <-- this is where the code breaks because pool is not iterable
TypeError: 'Pool' object is not iterable
Что я хочу, чтобы p
напечатал и выглядел как:
0 1
0 a - bb 5
1 ccc - dddd 25
2 ee - fff 13
3 gg - hhhh 20
4 i - jjj 10
Как я могу получить результат каждого расчета, а не только первый?