Я пытаюсь ускорить одно из моих существующих приложений. Я нашел этот урок здесь и попытался имитировать c эту функциональность:
var_dict = {}
def init_worker(X,Y, X_shape):
# Using a dictionary is not strictly necessary. You can also
# use global variables.
var_dict['X'] = X
var_dict['Y'] = Y
var_dict['X_shape'] = X_shape
def worker_func(i):
# Simply computes the sum of the i-th row of the input matrix X
X_np = np.frombuffer(var_dict['X']).reshape(var_dict['X_shape'])
Y_np = np.frombuffer(var_dict['Y']).reshape(var_dict['X_shape'])
acf = signal.correlate(X_np[i,:],Y_np[i,:],mode="full", method="fft")
return acf
if __name__ == '__main__':
X_shape = (5, 432000)
# Randomly generate some data
data1 = np.random.randn(*X_shape)
data2 = np.random.randn(*X_shape)
X = RawArray('d', X_shape[0] * X_shape[1])
Y = RawArray('d', X_shape[0] * X_shape[1])
# Wrap X as an numpy array so we can easily manipulates its data.
X_np = np.frombuffer(X).reshape(X_shape)
Y_np = np.frombuffer(Y).reshape(X_shape)
# Copy data to our shared array.
np.copyto(X_np, data1)
np.copyto(Y_np, data2)
# Start the process pool and do the computation.
# Here we pass X and X_shape to the initializer of each worker.
# (Because X_shape is not a shared variable, it will be copied to each
# child process.)
start = timer()
pool = Pool(processes=2, initializer=init_worker, initargs=(X,Y, X_shape))
result = pool.map(worker_func, range(X_shape[0]))
pool.close()
pool.join()
print result
print timer() - start
Этот пример работает отлично. Это на самом деле ускоряет расчет взаимной корреляции.
Когда я попытался интегрировать эту функциональность в мой класс Xcorrelator, он начал показывать странное поведение. Потребление памяти просто взорвалось, и я понятия не имею, как решить эту проблему.
Мой код довольно сложен, но вот как я пытаюсь интегрировать распараллеливание:
var_dict = {}
class Xcorrelator(object):
.
.
.
def __call__(self, w):
self.worker_func(w)
def init_worker(self, X,Y, X_shape):
# Using a dictionary is not strictly necessary. You can also
# use global variables.
print "INIT"
var_dict['X'] = X
var_dict['Y'] = Y
var_dict['X_shape'] = X_shape
def worker_func(self,i):
# Simply computes the sum of the i-th row of the input matrix X
print "WORKER"
X_np = np.frombuffer(var_dict['X']).reshape(var_dict['X_shape'])[i,:]
Y_np = np.frombuffer(var_dict['Y']).reshape(var_dict['X_shape'])[i,:]
ccf = signal.correlate(X_np,Y_np,mode="full", method="fft")
return ccf
def xcorr_parallel(self, maxlag = 600):
rem_waveform = self._max_waveforms if self._c - self._offset > self._max_waveforms else self._c - self._offset
shape = (rem_waveform, int((maxlag*self._sampling_rate*2) + 1))
a, b = self.get_data_mtx()
X_shape = a.shape
X = RawArray('d', X_shape[0] * X_shape[1])
Y = RawArray('d', X_shape[0] * X_shape[1])
X_np = np.frombuffer(X).reshape(X_shape)
Y_np = np.frombuffer(Y).reshape(X_shape)
np.copyto(X_np, a)
np.copyto(Y_np, b)
pool = multiprocessing.Pool(processes=2, initializer=self.init_worker, initargs=(X,Y, X_shape))
result = pool.map(self, range(X_shape[0]))
pool.close()
pool.join()
print result
self._xcorrelations[self._offset:self._offset + rem_waveform,:] = result
self._offset += self._max_waveforms
Как Вы видите, что два кода действительно идентичны. Разница лишь в том, что второй пример заключен в класс. Когда я вызываю метод xcorr_parallel
в моем тестовом коде, он фактически вычисляет переменную ccf
, но функция pool.map
возвращает все значения None
в массиве. Неважно, что я пробовал, данные в переменной ccf
всегда терялись.
Что мне здесь не хватает? Есть ли лучший способ обернуть механизм распараллеливания внутри класса? Как я могу преодолеть эту проблему?