Как использовать многопроцессорную обработку для ускорения следующей функции? - PullRequest
6 голосов
/ 15 мая 2019

У меня есть следующее для цикла:

for j in range(len(a_nested_list_of_ints)):
    arr_1_, arr_2_, arr_3_ = foo(a_nested_list_of_ints[j])
    arr_1[j,:] = arr_1_.data.numpy()
    arr_2[j,:] = arr_2_.data.numpy()
    arr_3[j,:] = arr_3_.data.numpy()

Где a_nested_list_of_ints - это вложенный список целых чисел.Однако это занимает много времени, чтобы закончить.Как я могу оптимизировать это через многопроцессорность?До сих пор я пытался использовать multiprocessing

p = Pool(5)
for j in range(len(a_nested_list_of_ints)):
    arr_1_, arr_2_, arr_3_ = p.map(foo,a_nested_list_of_ints[j])
    arr_1[j,:] = arr_1_.data.numpy()
    arr_2[j,:] = arr_2_.data.numpy()
    arr_3[j,:] = arr_3_.data.numpy()

Однако я получаю:

ValueError: not enough values to unpack (expected 3, got 2)

здесь:

    arr_1_, arr_2_, arr_3_ = p.map(foo,a_nested_list_of_ints[j])

Любая идея о том, как сделатьвышеуказанная операция быстрее?Я также даже попробовал с starmap, но он не работает.

Ответы [ 2 ]

4 голосов
/ 16 мая 2019

Вот демоверсия pool, которая работает:

In [11]: def foo(i): 
    ...:     return np.arange(i), np.arange(10-i) 
    ...:                                                                        
In [12]: with multiprocessing.Pool(processes=2) as pool: 
    ...:     x = pool.map(foo, range(10)) 
    ...:                                                                        
In [13]: x                                                                      
Out[13]: 
[(array([], dtype=int64), array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])),
 (array([0]), array([0, 1, 2, 3, 4, 5, 6, 7, 8])),
 (array([0, 1]), array([0, 1, 2, 3, 4, 5, 6, 7])),
 (array([0, 1, 2]), array([0, 1, 2, 3, 4, 5, 6])),
 (array([0, 1, 2, 3]), array([0, 1, 2, 3, 4, 5])),
 (array([0, 1, 2, 3, 4]), array([0, 1, 2, 3, 4])),
 (array([0, 1, 2, 3, 4, 5]), array([0, 1, 2, 3])),
 (array([0, 1, 2, 3, 4, 5, 6]), array([0, 1, 2])),
 (array([0, 1, 2, 3, 4, 5, 6, 7]), array([0, 1])),
 (array([0, 1, 2, 3, 4, 5, 6, 7, 8]), array([0]))]

pool.map выполняет итерацию, а не какой-то внешний for цикл.

И чтобы немного приблизиться к вашему примеру:

In [14]: def foo(alist): 
    ...:     return np.arange(*alist), np.zeros(alist,int) 
    ...:      
    ...:                                                                        
In [15]: alists=[(0,3),(1,4),(1,6,2)]                                           
In [16]: with multiprocessing.Pool(processes=2) as pool: 
    ...:     x = pool.map(foo, alists) 
    ...:                                                                        
In [17]: x                                                                      
Out[17]: 
[(array([0, 1, 2]), array([], shape=(0, 3), dtype=int64)),
 (array([1, 2, 3]), array([[0, 0, 0, 0]])),
 (array([1, 3, 5]), array([[[0, 0],
          [0, 0],
          [0, 0],
          [0, 0],
          [0, 0],
          [0, 0]]]))]

Обратите внимание, что pool.map возвращает список со всеми случаями, сгенерированными из alists. Не имеет смысла распаковывать это x.

 x,y = pool.map(...)   # too many values to pack error

Я могу распаковать x, используя идиому zip*:

In [21]: list(zip(*x))                                                          
Out[21]: 
[(array([0, 1, 2]), array([1, 2, 3]), array([1, 3, 5])),
 (array([], shape=(0, 3), dtype=int64), array([[0, 0, 0, 0]]), array([[[0, 0],
          [0, 0],
          [0, 0],
          [0, 0],
          [0, 0],
          [0, 0]]]))]

Это список из 2 кортежей; в действительности список версий транспонирования. Это может быть распаковано:

In [23]: y,z = zip(*x)                                                          
In [24]: y                                                                      
Out[24]: (array([0, 1, 2]), array([1, 2, 3]), array([1, 3, 5]))
In [25]: z                                                                      
Out[25]: 
(array([], shape=(0, 3), dtype=int64), array([[0, 0, 0, 0]]), array([[[0, 0],
         [0, 0],
         [0, 0],
         [0, 0],
         [0, 0],
         [0, 0]]]))
0 голосов
/ 16 мая 2019

Вот реализация для многопроцессорной обработки, которую я использую довольно часто.Он разделит ваш список, в данном случае a_nested_list_of_ints, на сколько угодно ядер.Затем он запускает вашу функцию foo для каждого из разделенных списков, по одному списку на ядро.

def split_list(all_params, instances):
    return list(np.array_split(all_params, instances))

# split the list up into equal chucks for each core
n_proc = multiprocessing.cpu_count()
split_items = split_list(to_calc, n_proc)

# create the multiprocessing pool
pool = Pool(processes=n_proc)
all_calcs = []
for i in range(n_proc):
    # the arguments to the foo definition have to be a tuple - (split[i],)
    async_calc = pool.apply_async(foo, (split_items[i],))
    all_calcs.append(async_calc)

pool.close()
pool.join()

# get results
all_results = []
for result in all_calcs:
    all_results += result.get()

print(all_results)
...