Как я могу использовать многопроцессорный пул и очередь вместе? - PullRequest
0 голосов
/ 14 декабря 2018

Мне нужно выполнить ~ 18000 довольно дорогих вычислений на суперкомпьютере, и я пытаюсь выяснить, как распараллелить код.В основном он работал с multiprocessing.Process , но он зависал на шаге .join (), если я выполнил более ~ 350 вычислений.

Один из компьютерных ученых, управляющих суперкомпьютером, рекомендовал использовать multiprocessing.Pool вместо Process.

При использовании Process я бы настроил Выходную очередь и список процессов, затем запустил и присоединился к таким процессам:

output = mp.Queue()
processes = [mp.Process(target=some_function,args=(x,output)) for x in some_array]
for p in processes:
    p.start()
for p in processes:
    p.join()

Поскольку processes - это список, онявляется итеративным, и я могу использовать output.get() внутри списка, чтобы получить все результаты:

result = [output.get() for p in processes]

Что эквивалентно этому при использовании пула?Если пул не повторяется, как я могу получить выходные данные каждого процесса, который находится внутри него?

Вот моя попытка с фиктивными данными и фиктивными расчет:

import pandas as pd
import multiprocessing as mp

##dummy function
def predict(row,output):
    calc = [len(row.c1)**2,len(row.c2)**2]
    output.put([row.c1+' - '+row.c2,sum(calc)])

#dummy data
c = pd.DataFrame(data=[['a','bb'],['ccc','dddd'],['ee','fff'],['gg','hhhh'],['i','jjj']],columns=['c1','c2'])

if __name__ == '__main__':
    #output queue
    print('initializing output container...')
    output = mp.Manager().Queue()


    #pool of processes
    print('initializing and storing calculations...')
    pool = mp.Pool(processes=5)
    for i,row in c.iterrows(): #try some smaller subsets here
         pool.apply_async(predict,args=(row,output))

    #run processes and keep a counter-->I'm not sure what replaces this with Pool!
    #for p in processes:
    #    p.start()

    ##exit completed processes-->or this!
    #for p in processes:
    #    p.join()

    #pool.close() #is this right?
    #pool.join() #this?

#store each calculation
print('storing output of calculations...')
p = pd.DataFrame([output.get() for p in pool]) ## <-- this is where the code breaks because pool is not iterable
print(p)

Вывод, который я получаю:

initializing output container...
initializing and storing calculations...
storing output of calculations...
Traceback (most recent call last):
  File "parallel_test.py", line 37, in <module>
    p = pd.DataFrame([output.get() for p in pool]) ## <-- this is where the code breaks because pool is not iterable
TypeError: 'Pool' object is not iterable

Что я хочу, чтобы p напечатал и выглядел как:

        0   1
0      a - bb   5
1  ccc - dddd  25
2    ee - fff  13
3   gg - hhhh  20
4     i - jjj  10

Как я могу получить результат каждого расчета, а не только первый?

1 Ответ

0 голосов
/ 15 декабря 2018

Даже если вы сохраняете все свои полезные результаты в очереди output, вы хотите получить результаты, вызвав output.get() количество раз, которое оно было сохранено в output (количество примеров обучения - len(c) втвой случай).Для меня это работает, если вы измените строку:

print('storing output of calculations...')
p = pd.DataFrame([output.get() for p in pool]) ## <-- this is where the code breaks because pool is not iterable

на:

print('storing output of calculations...')
    p = pd.DataFrame([output.get() for _ in range(len(c))]) ## <-- no longer breaks 
...