Как распараллелить подсчет суммы в python numpy? - PullRequest
3 голосов
/ 30 января 2012

У меня есть сумма, которую я пытаюсь вычислить, и мне трудно распараллелить код.Вычисление, которое я пытаюсь распараллелить, довольно сложное (в нем используются как массивы, так и разреженные матрицы).Он выплевывает массив, и я хочу суммировать выходные массивы из 1000 вычислений.В идеале я бы держал промежуточную сумму по всем итерациям.Однако я не смог понять, как это сделать.

До сих пор я пытался использовать функцию Parallels Joblib и функцию pool.map с пакетом многопроцессорной обработки python.Для обоих из них я использую внутреннюю функцию, которая возвращает массив numpy.Эти функции возвращают список, который я преобразовываю в пустой массив, а затем суммирую.

Однако после того, как функция joblib Parallel завершает все итерации, основная программа никогда не продолжает работать (похоже, что исходное задание находится в приостановленном состоянии с использованием 0% ЦП).Когда я использую pool.map, я получаю ошибки памяти после завершения всех итераций.

Есть ли способ просто распараллелить текущую сумму массивов?

Edit : Цель состоит в том, чтобы сделать что-то вроде следующего, за исключением параллельного.

def summers(num_iters):

    sumArr = np.zeros((1,512*512)) #initialize sum
    for index in range(num_iters):
        sumArr = sumArr + computation(index) #computation returns a 1 x 512^2 numpy array

    return sumArr

Ответы [ 2 ]

5 голосов
/ 02 февраля 2012

Я понял, как выполнить параллелизацию суммы массивов с многопроцессорностью, apply_async и обратными вызовами, поэтому я публикую это здесь для других людей Я использовал пример страницы для Parallel Python для класса обратного вызова Sum, хотя на самом деле я не использовал этот пакет для реализации. Это дало мне идею использовать обратные вызовы, хотя. Вот упрощенный код того, что я в итоге использовал, и он делает то, что хотел.

import multiprocessing
import numpy as np
import thread

class Sum: #again, this class is from ParallelPython's example code (I modified for an array and added comments)
    def __init__(self):
        self.value = np.zeros((1,512*512)) #this is the initialization of the sum
        self.lock = thread.allocate_lock()
        self.count = 0

    def add(self,value):
        self.count += 1
        self.lock.acquire() #lock so sum is correct if two processes return at same time
        self.value += value #the actual summation
        self.lock.release()

def computation(index):
    array1 = np.ones((1,512*512))*index #this is where the array-returning computation goes
    return array1

def summers(num_iters):
    pool = multiprocessing.Pool(processes=8)

    sumArr = Sum() #create an instance of callback class and zero the sum
    for index in range(num_iters):
        singlepoolresult = pool.apply_async(computation,(index,),callback=sumArr.add)

    pool.close()
    pool.join() #waits for all the processes to finish

    return sumArr.value

Мне также удалось заставить это работать, используя распараллеленную карту, что было предложено в другом ответе. Я пробовал это раньше, но я не реализовал это правильно. Оба способа работают, и я думаю, этот ответ объясняет вопрос о том, какой метод использовать (map или apply.async) довольно хорошо. Для версии карты вам не нужно определять класс Sum, а функция summers становится

def summers(num_iters):
    pool = multiprocessing.Pool(processes=8)

    outputArr = np.zeros((num_iters,1,512*512)) #you wouldn't have to initialize these
    sumArr = np.zeros((1,512*512))              #but I do to make sure I have the memory

    outputArr = np.array(pool.map(computation, range(num_iters)))
    sumArr = outputArr.sum(0)

    pool.close() #not sure if this is still needed since map waits for all iterations

    return sumArr
1 голос
/ 31 января 2012

Я не уверен, что понимаю проблему.Вы просто пытаетесь разделить список на пул рабочих, чтобы они сохранили текущую сумму своих вычислений и суммировали результат?

#!/bin/env python
import sys
import random
import time
import multiprocessing
import numpy as np

numpows = 5
numitems = 25
nprocs = 4

def expensiveComputation( i ):
  time.sleep( random.random() * 10 )
  return np.array([i**j for j in range(numpows)])

def listsum( l ):
  sum = np.zeros_like(l[0])
  for item in l:
    sum = sum + item
  return sum

def partition(lst, n):
  division = len(lst) / float(n)
  return [ lst[int(round(division * i)): int(round(division * (i + 1)))] for i in xrange(n) ]

def myRunningSum( l ):
  sum = np.zeros(numpows)
  for item in l:
     sum = sum + expensiveComputation(item)
  return sum

if __name__ == '__main__':

  random.seed(1)
  data = range(numitems)

  pool = multiprocessing.Pool(processes=4,)
  calculations = pool.map(myRunningSum, partition(data,nprocs))

  print 'Answer is:', listsum(calculations)
  print 'Expected answer: ', np.array([25.,300.,4900.,90000.,1763020.])

(функция разделения, полученная из Python: Slicingсписок в n разделов почти одинаковой длины )

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...