Я понял, как выполнить параллелизацию суммы массивов с многопроцессорностью, apply_async и обратными вызовами, поэтому я публикую это здесь для других людей Я использовал пример страницы для Parallel Python для класса обратного вызова Sum, хотя на самом деле я не использовал этот пакет для реализации. Это дало мне идею использовать обратные вызовы, хотя. Вот упрощенный код того, что я в итоге использовал, и он делает то, что хотел.
import multiprocessing
import numpy as np
import thread
class Sum: #again, this class is from ParallelPython's example code (I modified for an array and added comments)
def __init__(self):
self.value = np.zeros((1,512*512)) #this is the initialization of the sum
self.lock = thread.allocate_lock()
self.count = 0
def add(self,value):
self.count += 1
self.lock.acquire() #lock so sum is correct if two processes return at same time
self.value += value #the actual summation
self.lock.release()
def computation(index):
array1 = np.ones((1,512*512))*index #this is where the array-returning computation goes
return array1
def summers(num_iters):
pool = multiprocessing.Pool(processes=8)
sumArr = Sum() #create an instance of callback class and zero the sum
for index in range(num_iters):
singlepoolresult = pool.apply_async(computation,(index,),callback=sumArr.add)
pool.close()
pool.join() #waits for all the processes to finish
return sumArr.value
Мне также удалось заставить это работать, используя распараллеленную карту, что было предложено в другом ответе. Я пробовал это раньше, но я не реализовал это правильно. Оба способа работают, и я думаю, этот ответ объясняет вопрос о том, какой метод использовать (map или apply.async) довольно хорошо. Для версии карты вам не нужно определять класс Sum, а функция summers становится
def summers(num_iters):
pool = multiprocessing.Pool(processes=8)
outputArr = np.zeros((num_iters,1,512*512)) #you wouldn't have to initialize these
sumArr = np.zeros((1,512*512)) #but I do to make sure I have the memory
outputArr = np.array(pool.map(computation, range(num_iters)))
sumArr = outputArr.sum(0)
pool.close() #not sure if this is still needed since map waits for all iterations
return sumArr