Python многопроцессорная перегрузка памяти - PullRequest
1 голос
/ 06 апреля 2020

Я пытаюсь ускорить одно из моих существующих приложений. Я нашел этот урок здесь и из этого я построил контрольный пример:

import numpy as np
from scipy import signal
from multiprocessing import Pool #  Process pool
from multiprocessing import sharedctypes
from timeit import default_timer as timer

col = 432000
row = 365

X = np.random.random((row, col))
Y = np.random.random((row, col))

result = np.ctypeslib.as_ctypes(np.zeros((row, 2*col-1)))
shared_array = sharedctypes.RawArray(result._type_, result)

def autocorr(row):
    print row
    tmp = np.ctypeslib.as_array(shared_array)
    tmp[row,:] = signal.correlate(X[row,:], Y[row,:], mode="full", method="fft")


window_idxs = np.arange(0,X.shape[0], 1)

start = timer()
p = Pool(2)
p.map(autocorr, window_idxs)
result = np.ctypeslib.as_array(shared_array)
p.close()
p.join()
print timer() - start

Он отлично работает. Это решение почти в два раза быстрее, чем версия без распараллеливания.

Когда я попытался интегрировать эту функциональность в мой класс Xcorrelator, он начал показывать странное поведение. Потребление памяти просто взорвалось, и я понятия не имею, как справиться с этой проблемой.

Мой код довольно сложен, но вот как я пытаюсь интегрировать распараллеливание:

class Xcorrelator(object):
    .
    .
    .
    def __call__(self, w):
        self.xcorr_helper(w)

    def xcorr_parallel(self, maxlag = 600):
        rem_waveform = self._c #integer 0<c<365
        shape = (rem_waveform, int((maxlag*self._sampling_rate*2) + 1))


        self._a, self._b = self.get_data_mtx() # This is the same numpy array as X and Y in the test code. Have the same size
        tcorr = np.arange(-self._a.shape[1] + 1, self._a.shape[1])
        self._dN = np.where(np.abs(tcorr) <= 600*self._sampling_rate)[0]

        result_ccfs =  np.ctypeslib.as_ctypes(np.zeros(shape))
        global shared_array
        shared_array = multiprocessing.sharedctypes.RawArray(result_ccfs._type_, result_ccfs)
        self._lagtime = np.arange(-maxlag, maxlag + 1,1)

        w = np.arange(0,rem_waveform, 1) #numpy array [0..rem_waveform -1]

        #This is an important part!
        p = multiprocessing.Pool(4)
        p.map(self, w)
        p.close()
        p.join()

        result = np.ctypeslib.as_array(shared_array)
        self._xcorrelations[self._offset:self._offset + rem_waveform,:] = result 
        self._offset += self._max_waveforms 

    #works the same way as in the test code
    def xcorr_helper(self,row): 
        tmp = np.ctypeslib.as_array(shared_array)
        tmp[int(row),:] = signal.correlate(
            in1 = self._a[row,:],
            in2 = self._b[row,:],
            mode="full", 
            method="fft"
        )[self._dN]

Почему он съедает всю мою память, когда я оборачиваю распараллеливание внутри класса? В чем разница в двух случаях? Есть ли лучший подход?

...