Несоответствие между параллельным и линейным вложенным циклом - PullRequest
0 голосов
/ 27 марта 2020

Я хочу распараллелить фрагмент кода, похожий на следующий:

 Ngal=10
 sampind=[7,16,22,31,45]
 samples=0.3*np.ones((60,Ngal))
 zt=[2.15,7.16,1.23,3.05,4.1,2.09,1.324,3.112,0.032,0.2356]
 toavg=[]
 for j in range(Ngal):
     gal=[] 
     for m in sampind:
          gal.append(samples[m][j]-zt[j])
     toavg.append(np.mean(gal))
 accuracy=np.mean(toavg)

, поэтому я последовал совету здесь и переписал его следующим образом:

toavg=[]
gal=[]
p = mp.Pool()

def deltaz(params):
    j=params[0] # index of the galaxy
    m=params[1] # indices for which we have sampled redshifts
    gal.append(samples[m][j]-zt[j])
    return np.mean(gal)

j=(np.linspace(0,Ngal-1,Ngal).astype(int))
m=sampind
grid=[j,m]
input=itertools.product(*grid)
results = p.map(deltaz,input)
accuracy=np.mean(results)
p.close()
p.join()

но результаты не совпадают. На самом деле, иногда они есть, иногда нет. Это не кажется очень детерминированным c. Правильный ли мой подход? Если нет, что я должен исправить? Спасибо! Модули, которые вам понадобятся для воспроизведения приведенных выше примеров:

import numpy as np
import multiprocess as mp
import itertools

Спасибо!

1 Ответ

1 голос
/ 28 марта 2020

Первая проблема, которую я вижу, заключается в том, что вы создаете глобальную переменную gal, к которой обращается функция deltaz. Однако они не разделяются между процессами пула, а создаются для каждого процесса отдельно. Вам придется использовать разделяемую память, если вы хотите, чтобы они разделяли эту структуру. Возможно, именно поэтому вы видите недетерминированное поведение c.

Следующая проблема заключается в том, что вы фактически не выполняете одно и то же задание с другим вариантом. Первый вы берете среднее значение каждого набора средних (гал). Параллельный принимает среднее из всех когда-либо оказавшихся элементов в этом списке. Это недетерминировано c, поскольку элементы назначаются процессам по мере их появления, и это не обязательно предсказуемо.

Я бы предложил распараллелить внутренний l oop. Для этого вам нужно, чтобы zt и samples находились в общей памяти, поскольку к ним обращаются все процессы. Это может стать опасным, если вы изменяете данные, но так как вы, кажется, только читаете, это должно быть хорошо.

import numpy as np
import multiprocessing as mp
import itertools
import ctypes
#Non-parallel code
Ngal=10
sampind=[7,16,22,31,45]
samples=0.3*np.ones((60,Ngal))
zt=[2.15,7.16,1.23,3.05,4.1,2.09,1.324,3.112,0.032,0.2356]
#Nonparallel
toavg=[]
for j in range(Ngal):
    gal=[]
    for m in sampind:
         gal.append(samples[m][j]-zt[j])
    toavg.append(np.mean(gal))
accuracy=np.mean(toavg)
print(toavg)

# Parallel function
def deltaz(j):
    sampind=[7,16,22,31,45]
    gal = []
    for m in sampind:
         gal.append(samples[m][j]-zt[j])
    return np.mean(gal)
# Shared array for zt
zt_base = mp.Array(ctypes.c_double, int(len(zt)),lock=False)
ztArr = np.ctypeslib.as_array(zt_base)
#Shared array for samples
sample_base = mp.Array(ctypes.c_double, int(np.product(samples.shape)),lock=False)
sampArr = np.ctypeslib.as_array(sample_base)
sampArr = sampArr.reshape(samples.shape)
#Copy arrays to shared
sampArr[:,:] = samples[:,:]
ztArr[:] = zt[:]
with mp.Pool() as p:
    result = p.map(deltaz,(np.linspace(0,Ngal-1,Ngal).astype(int)))
    print(result)

Вот пример, который дает те же результаты. Вы можете добавить к этому больше сложности, как считаете нужным, но я бы прочитал о многопроцессорности в целом и типах / областях памяти, чтобы понять, что будет, а что не будет работать. Вы должны больше заботиться, когда попадаете в мир многопроцессорности. Дайте мне знать, если это не поможет, и я постараюсь обновить его, чтобы он помог.

...