python многопроцессорная карта возвращает Nones - PullRequest
0 голосов
/ 10 апреля 2020

Я пытаюсь ускорить одно из моих существующих приложений. Я нашел этот урок здесь и попытался имитировать c эту функциональность:

var_dict = {}

def init_worker(X,Y, X_shape):
    # Using a dictionary is not strictly necessary. You can also
    # use global variables.
    var_dict['X'] = X
    var_dict['Y'] = Y
    var_dict['X_shape'] = X_shape

def worker_func(i):
    # Simply computes the sum of the i-th row of the input matrix X
    X_np = np.frombuffer(var_dict['X']).reshape(var_dict['X_shape'])
    Y_np = np.frombuffer(var_dict['Y']).reshape(var_dict['X_shape'])
    acf = signal.correlate(X_np[i,:],Y_np[i,:],mode="full", method="fft")
    return acf

if __name__ == '__main__':
    X_shape = (5, 432000)
    # Randomly generate some data
    data1 = np.random.randn(*X_shape)
    data2 = np.random.randn(*X_shape)
    X = RawArray('d', X_shape[0] * X_shape[1])
    Y = RawArray('d', X_shape[0] * X_shape[1])
    # Wrap X as an numpy array so we can easily manipulates its data.
    X_np = np.frombuffer(X).reshape(X_shape)
    Y_np = np.frombuffer(Y).reshape(X_shape)
    # Copy data to our shared array.
    np.copyto(X_np, data1)
    np.copyto(Y_np, data2)
    # Start the process pool and do the computation.
    # Here we pass X and X_shape to the initializer of each worker.
    # (Because X_shape is not a shared variable, it will be copied to each
    # child process.)
    start = timer()
    pool = Pool(processes=2, initializer=init_worker, initargs=(X,Y, X_shape))

    result = pool.map(worker_func, range(X_shape[0]))
    pool.close()
    pool.join()

    print result
    print timer() - start

Этот пример работает отлично. Это на самом деле ускоряет расчет взаимной корреляции.

Когда я попытался интегрировать эту функциональность в мой класс Xcorrelator, он начал показывать странное поведение. Потребление памяти просто взорвалось, и я понятия не имею, как решить эту проблему.

Мой код довольно сложен, но вот как я пытаюсь интегрировать распараллеливание:

var_dict = {}
class Xcorrelator(object):
    .
    .
    .
    def __call__(self, w):
        self.worker_func(w)

    def init_worker(self, X,Y, X_shape):
        # Using a dictionary is not strictly necessary. You can also
        # use global variables.
        print "INIT"
        var_dict['X'] = X
        var_dict['Y'] = Y
        var_dict['X_shape'] = X_shape

    def worker_func(self,i):
        # Simply computes the sum of the i-th row of the input matrix X
        print "WORKER"
        X_np = np.frombuffer(var_dict['X']).reshape(var_dict['X_shape'])[i,:]
        Y_np = np.frombuffer(var_dict['Y']).reshape(var_dict['X_shape'])[i,:]

        ccf = signal.correlate(X_np,Y_np,mode="full", method="fft")
        return ccf

    def xcorr_parallel(self, maxlag = 600):
        rem_waveform = self._max_waveforms if self._c - self._offset > self._max_waveforms else self._c - self._offset
        shape = (rem_waveform, int((maxlag*self._sampling_rate*2) + 1))

        a, b = self.get_data_mtx()
        X_shape = a.shape

        X = RawArray('d', X_shape[0] * X_shape[1])
        Y = RawArray('d', X_shape[0] * X_shape[1])

        X_np = np.frombuffer(X).reshape(X_shape)
        Y_np = np.frombuffer(Y).reshape(X_shape)


        np.copyto(X_np, a)
        np.copyto(Y_np, b)    

        pool = multiprocessing.Pool(processes=2, initializer=self.init_worker, initargs=(X,Y, X_shape))
        result = pool.map(self, range(X_shape[0]))
        pool.close()
        pool.join()
        print result

        self._xcorrelations[self._offset:self._offset + rem_waveform,:] = result 
        self._offset += self._max_waveforms 

Как Вы видите, что два кода действительно идентичны. Разница лишь в том, что второй пример заключен в класс. Когда я вызываю метод xcorr_parallel в моем тестовом коде, он фактически вычисляет переменную ccf, но функция pool.map возвращает все значения None в массиве. Неважно, что я пробовал, данные в переменной ccf всегда терялись.

Что мне здесь не хватает? Есть ли лучший способ обернуть механизм распараллеливания внутри класса? Как я могу преодолеть эту проблему?

...