Я пытаюсь ускорить одно из моих существующих приложений. Я нашел этот урок здесь и из этого я построил контрольный пример:
import numpy as np
from scipy import signal
from multiprocessing import Pool # Process pool
from multiprocessing import sharedctypes
from timeit import default_timer as timer
col = 432000
row = 365
X = np.random.random((row, col))
Y = np.random.random((row, col))
result = np.ctypeslib.as_ctypes(np.zeros((row, 2*col-1)))
shared_array = sharedctypes.RawArray(result._type_, result)
def autocorr(row):
print row
tmp = np.ctypeslib.as_array(shared_array)
tmp[row,:] = signal.correlate(X[row,:], Y[row,:], mode="full", method="fft")
window_idxs = np.arange(0,X.shape[0], 1)
start = timer()
p = Pool(2)
p.map(autocorr, window_idxs)
result = np.ctypeslib.as_array(shared_array)
p.close()
p.join()
print timer() - start
Он отлично работает. Это решение почти в два раза быстрее, чем версия без распараллеливания.
Когда я попытался интегрировать эту функциональность в мой класс Xcorrelator
, он начал показывать странное поведение. Потребление памяти просто взорвалось, и я понятия не имею, как справиться с этой проблемой.
Мой код довольно сложен, но вот как я пытаюсь интегрировать распараллеливание:
class Xcorrelator(object):
.
.
.
def __call__(self, w):
self.xcorr_helper(w)
def xcorr_parallel(self, maxlag = 600):
rem_waveform = self._c #integer 0<c<365
shape = (rem_waveform, int((maxlag*self._sampling_rate*2) + 1))
self._a, self._b = self.get_data_mtx() # This is the same numpy array as X and Y in the test code. Have the same size
tcorr = np.arange(-self._a.shape[1] + 1, self._a.shape[1])
self._dN = np.where(np.abs(tcorr) <= 600*self._sampling_rate)[0]
result_ccfs = np.ctypeslib.as_ctypes(np.zeros(shape))
global shared_array
shared_array = multiprocessing.sharedctypes.RawArray(result_ccfs._type_, result_ccfs)
self._lagtime = np.arange(-maxlag, maxlag + 1,1)
w = np.arange(0,rem_waveform, 1) #numpy array [0..rem_waveform -1]
#This is an important part!
p = multiprocessing.Pool(4)
p.map(self, w)
p.close()
p.join()
result = np.ctypeslib.as_array(shared_array)
self._xcorrelations[self._offset:self._offset + rem_waveform,:] = result
self._offset += self._max_waveforms
#works the same way as in the test code
def xcorr_helper(self,row):
tmp = np.ctypeslib.as_array(shared_array)
tmp[int(row),:] = signal.correlate(
in1 = self._a[row,:],
in2 = self._b[row,:],
mode="full",
method="fft"
)[self._dN]
Почему он съедает всю мою память, когда я оборачиваю распараллеливание внутри класса? В чем разница в двух случаях? Есть ли лучший подход?