Многопроцессорность: как преобразовать mp.map в функцию, хранящую элементы в списке? - PullRequest
1 голос
/ 04 мая 2019

У меня есть программа, похожая на следующую:

import time
from multiprocessing import Pool

class a_system():
    def __init__(self,N):
        self.N = N
        self.L = [0 for _ in range(self.N)]
    def comp(self,n):
        self.L[n] = 1
        return self.L
    def reset(self):
        self.L = [0 for _ in range(self.N)]

def individual_sim(iter):
    global B, L, sys
    sys.reset()
    L[iter] = sys.comp(iter)
    B += sum(L[iter])
    time.sleep(1)
    return L, B

def simulate(N_mc):
    global B, L, sys
    L = [[] for _ in range(N_mc)]
    B = 0
    sys = a_system(N_mc)
    [*map(individual_sim, range(N_mc))]
    # with Pool() as P:
    #     P.map(individual_sim,range(N_mc))
    return L, B

if __name__=="__main__":
    start = time.time()
    L, B = simulate(N_mc=5)
    print(L)
    print(B)
    print("Time elapsed: ",time.time()-start)

Здесь я бы хотел распараллелить строку [*map(individual_sim, range(N_mc))] с многопроцессорной обработкой. Однако, заменив эту строку на

with Pool() as P:
     P.map(individual_sim,range(N_mc))

возвращает пустой список списков.

Если вместо этого я использую P.map_async, P.imap или P.imap_unordered, я не получу сообщение об ошибке, но список и B останутся пустыми.

Как я могу распараллелить этот код?

P.S. Я пробовал ThreadPool из multiprocessing.pool, но я бы хотел этого избежать, потому что класс a_system, который немного сложнее, чем показанный здесь, должен иметь различную копию для каждого работника (я получаю exit code 139 (interrupted by signal 11: SIGSEGV)).

P.S.2 Я мог бы попытаться использовать sharedctypes или Managers (?), Но я не уверен, как они работают, и какой из них я должен использовать (или комбинацию?).

P.S.3 Я также попытался изменить individual_sim как

def individual_sim(iter,B,L,sys):
    sys.reset()
    L[iter] = sys.comp(iter)
    B += sum(L[iter])
    time.sleep(1)
    return L, B

и использовать следующее в simulation:

   from functools import partial
   part_individual_sim = partial(individual_sim, B=B, L=L, sys=sys)
   with Pool() as P:
        P.map(part_individual_sim,range(N_mc))

Но я все еще получаю пустые списки.

Ответы [ 2 ]

3 голосов
/ 04 мая 2019

Мне не совсем понятно, какая у вас бизнес-логика, но вы не можете изменять глобальные переменные в родительском элементе из своих дочерних процессов.Отдельные процессы не разделяют свое адресное пространство.

Вы можете сделать L a Manager.List и B a Manager.Value, чтобы изменить их из ваших рабочих процессов.Менеджер-объекты живут в отдельном серверном процессе, и вы можете изменять их с помощью прокси-объектов.Кроме того, вам нужно будет использовать Manager.Lock при изменении этих общих объектов для предотвращения повреждения данных.

Вот сокращенный пример, с которого следует начать:

import time
from multiprocessing import Pool, Manager


def individual_sim(mlist, mvalue, mlock, idx):
    # in your real computation, make sure to not hold the lock longer than
    # really needed (e.g. calculations without holding lock)
    with mlock:
        mlist[idx] += 10
        mvalue.value += sum(mlist)


def simulate(n_workers, n):

    with Manager() as m:
        mlist = m.list([i for i in range(n)])
        print(mlist)
        mvalue = m.Value('i', 0)
        mlock = m.Lock()

        iterable = [(mlist, mvalue, mlock, i) for i in range(n)]

        with Pool(processes=n_workers) as pool:
             pool.starmap(individual_sim, iterable)

        # convert to non-shared objects before terminating manager
        mlist = list(mlist)
        mvalue = mvalue.value

    return mlist, mvalue


if __name__=="__main__":

    N_WORKERS = 4
    N = 20

    start = time.perf_counter()
    L, B = simulate(N_WORKERS, N)
    print(L)
    print(B)
    print("Time elapsed: ",time.perf_counter() - start)

Пример вывода:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
5900
Time elapsed:  0.14064819699706277

Process finished with exit code 0

Можно также использовать параметр initializer пула для передачи прокси при инициализации работника и регистрации их как глобальных, вместо отправки их в качестве обычных аргументов при вызове starmap.

Немного больше об использовании Manager (релевантно: вложенные прокси). Я написал здесь .

1 голос
/ 04 мая 2019

модуль multiprocessing работает за счет fork основного процесса (или выполнения большего количества копий интерпретатора Python, особенно под Windows).

поэтому вы увидите глобальные переменные, но онине будет разделяться между процессами - если вы не пойдете на специальные меры, такие как, например, явное разделение памяти.Вам лучше передать требуемое состояние в виде параметров функции (или через Pool 's initializer и initargs) и передать результаты обратно через возвращаемое значение.

это имеет тенденцию ограничивать вашенемного выбора дизайна, особенно если вам нужно передать много состояний (например, в качестве данных, которые вы хотите разместить)

это очень легкая оболочка для довольно низкоуровневых примитивов, следовательно, она не так интересна, как вещикак Dask, но производительность, как правило, лучше, если вы можете жить с ограничениями

редактирования, чтобы включить некоторый демонстрационный код, который предполагает, что переменная N_mc в вашем вопросе связана с тем, что вы выполняете какое-то монте-карло / рандомизированное приближение,Я начинаю с использования некоторых библиотек:

from multiprocessing import Pool

from PIL import Image
import numpy as np

и определяю рабочую функцию и код для ее инициализации:

def initfn(path):
    # make sure worker processes don't share RNG state, see:
    #   https://github.com/numpy/numpy/issues/9650
    np.random.seed()

    global image
    with Image.open(path) as img:
        image = np.asarray(img.convert('L'))

def worker(i, nsamps):
    height, width = image.shape
    subset = image[
        np.random.randint(height, size=nsamps),
        np.random.randint(width, size=nsamps),
    ]
    return np.mean(subset)

def mc_mean(path, nsamples, niter):
    with Pool(initializer=initfn, initargs=(path,)) as pool:
        params = [(i, nsamples) for i in range(niter)]
        return pool.starmap(worker, params)

, то есть initfn считывает файл JPEG / PNG в numpyмассив, тогда worker просто вычисляет среднее значение (то есть яркость) для некоторого случайного подмножества пикселей.Обратите внимание, что цветные изображения загружаются в виде 3d-матриц, проиндексированных [row, col, channel] (каналы обычно 0 = красный, 1 = синий, 2 = зеленый).Кроме того, мы также явно вызываем np.random.seed, чтобы убедиться, что наши рабочие задания не получают одинаковую последовательность случайных значений.

Затем мы можем запустить это и построить вывод, чтобы убедиться, что все выглядит хорошо:

import scipy.stats as sps
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style='ticks')

filename = 'an_image.jpeg'
result = mc_mean(filename, 1234, 56789)

# Histogram of our results
plt.hist(result, 201, density=True, alpha=0.5, edgecolor='none')

# also calculate/display expected distribution
with Image.open(filename) as img:
    arr = np.asarray(img.convert('L'))
    # approximate distribution of monte-carlo error 
    mcdist = sps.norm(np.mean(arr), np.std(arr) / np.sqrt(1234))

mn,mx = plt.xlim()
plt.xlim(mn, mx)

x = np.linspace(mn, mx, 201)
plt.plot(x, mcdist.pdf(x), '--', color='C1')
sns.despine()

, что должно дать нам что-то вроде:

MC distribution

очевидно, это будет зависеть от используемого изображения, это от этот JPEG .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...