Многопоточность nitey nditerator - PullRequest
       4

Многопоточность nitey nditerator

0 голосов
/ 19 сентября 2019

Для реализации MCMC я хочу вычислить тензор ковариации C в единицах.

Рабочий однопоточный код

Расстояние между двумя элементами основано на расстоянии между их индексами.Для справки, вот рабочий однопоточный код (с примером расстояния):

import numpy as np

#set size, dimensions, etc
size = 20
ndim = 2
shape = (size,)*ndim*2

#initialize tensor
C = np.zeros(shape)
#example distance
dist = lambda x, y: np.sqrt(np.sum((x-y)**2))

#this runs as a class method, so please forgive my sloppy coding here
def update_tensor():
    it = np.nditer(C, flags=['multi_index'], op_flags=['readwrite'])
    while not it.finished:
        idx = np.array(it.multi_index)
        it[0] = dist(idx[:idx.shape[0]//2], idx[idx.shape[0]//2:])
        it.iternext()

update_tensor()

Попытка решения

Теперь проблема в том, что при применении C к матрице x является многопоточнымОперация:

x = np.random.standard_normal((size,)*ndim)
result = np.tensordot(C, x, axes=ndim)

, не суммирующая записи C не является.Моя идея состояла в том, чтобы разделить C после инициализации вдоль его первой оси и выполнить итерацию по частям отдельно:

import multiprocessing
def _calc_distances(C):
    'Calculate distances of submatrices'
    it = np.nditer(C, flags=['multi_index'], op_flags=['readwrite'])
    while not it.finished:
        idx = np.array(it.multi_index)
        it[0] = dist(idx[:idx.shape[0]//2], idx[idx.shape[0]//2:])
        it.iternext()
    return C

def update_tensor(C):
    'Updates Covariance Operator'   
    #Multicore Processing
    n_processes = multiprocessing.cpu_count()
    Chunks = [
        C[i*C.shape[0]//n_processes:(i+1)*C.shape[0]//n_processes] for i in range(0, n_processes-1)
    ]
    Chunks.append(C[C.shape[0]//n_processes*(n_processes-1):])
    with multiprocessing.Pool(n_processes+1) as p:
        #map and stitch together
        C = np.concatenate(
            p.map(_calc_distances, Chunks)
        )

Но это не удается, поскольку изменяются значения подматриц.

Вопрос

Есть ли лучшее решение для этого?Как мне исправить проблему с индексом?Вероятно, самый лучший способ - это просто перебирать части массива с потоками, разделяющими данные C. Это возможно?

Q / A

Q: Вам нужно использовать numpy?итератор?A: Нет, это хорошо, но я могу отказаться от этого.

1 Ответ

0 голосов
/ 19 сентября 2019

работал так.Просто собираюсь опубликовать урок здесь.

Тесты

CPU: Intel Core i5-6300U@2.5GHz, boosting to ~2.9GHz
Windows 10 64-bit, Python 3.7.4, Numpy 1.17

Performance

Pro: Меньше времени вычислений Con: Использует немного большеБАРАН;несколько сложный код.

Рабочий многопоточный код

import multiprocessing
import numpy as np

class CovOp(object):
    'F[0,1]^ndim->C[0,1]^ndim'
    def f(self, r):
        return np.exp(-r/self.ro)#(1 + np.sqrt(3)*r / self.ro) * np.exp(-np.sqrt(3) * r / self.ro)

    def dist(self, x,y):
        return np.sum((x-y)**2)

    def __init__(self, ndim, size, sigma=1, ro=1):
        self.tensor_cached = False
        self.inverse_cached = False
        self.ndim = ndim
        self.size = size
        self.shape = (size,)*ndim*2
        self.C = np.zeros(self.shape)
        self.Inv = np.zeros(self.shape)
        self.ro = ro * size
        self.sigma = sigma      

    def __call__(self, x):
        if not self.tensor_cached:
            self.update_tensor
        if self.ndim == 0:
            return self.sigma * self.C * x
        elif self.ndim == 1:
            return self.sigma * np.dot(self.C, x)
        return self.sigma * np.tensordot(self.C, x, axes=self.ndim)

    def _calc_distances(self, Chunk:tuple):
        'Calculate distances of submatrices'
        C, offset = Chunk
        it = np.nditer(C, flags=['multi_index'], op_flags=['readwrite'])
        while not it.finished:
            idx = np.array(it.multi_index)
            idx[0]+=offset
            d = self.dist(idx[:idx.shape[0]//2], idx[idx.shape[0]//2:])
            it[0] = self.f(d)
            it.iternext()
        return C

    def update_tensor(self):
        'Updates Covariance Operator'   
        #Multicore Processing
        n_processes = multiprocessing.cpu_count()
        Chunks = [
            (
                self.C[i*self.C.shape[0]//n_processes:(i+1)*self.C.shape[0]//n_processes],
                i*self.C.shape[0]//n_processes) for i in range(0, n_processes-1)
        ]
        Chunks.append((
                self.C[self.C.shape[0]//n_processes*(n_processes-1):],
                self.C.shape[0]//n_processes*(n_processes-1)
            )
        )
        with multiprocessing.Pool(n_processes+1) as p:
            self.C = np.concatenate(
                p.map(self._calc_distances, Chunks)
            )      
        self.tensor_cached = True
        #missing cholesky decomposition

    def update_inverse(self):
        if self.ndim==1:
            self.Inv = np.linalg.inv(self.C)
        elif self.ndim>1:
            self.Inv = np.linalg.tensorinv(self.C)
        else:
            self.Inv = 1/self.C
        self.inverse_cached = True

    def inv(self, x):
        if self.ndim == 0:
            return self.Inv * x / self.sigma
        elif self.ndim == 1:
            return np.dot(self.Inv, x) / self.sigma
        return np.tensordot(self.Inv, x) / self.sigma
if __name__=='__main__':

        size = 30
        ndim = 2
        depth = 1

        Cov = CovOp(ndim, size, 1, .2)


        import time

        n_tests = 5
        t_start = time.perf_counter()
        for i in range(n_tests):
            Cov.update_tensor()
        t_stop = time.perf_counter()
        dt_new = t_stop - t_start

        print(
        '''Benchmark; NDim: %s, Size: %s NTests: %s
        Mean time per test:
            Multithreaded %ss'''%(ndim, size, n_tests, dt_new/n_tests)
        )
...