Как выполнить итеративную многопроцессорную обработку в несколько массивов numpy? - PullRequest
0 голосов
/ 03 августа 2020

Какой правильный способ распараллелить это? По сути, у меня есть очень большой 2D-массив. Я хочу выполнить линейную подгонку каждой строки к отдельному массиву той же длины (x), который будет постоянным для всех строк. Ожидаемый результат - это одномерный массив (data_slopes) с линейными наклонами. Этот код работает, но работает очень медленно:

for j in range(img1_data_r.shape[0]):

    y = img1_data_r[j,:]
    model = LinearRegression()
    model.fit(x.reshape((-1, 1)),y,1)
    data_slopes[j] = model.coef_[0]

У меня нет опыта работы с многопроцессорным пулом, и я безуспешно пытался

Ответы [ 3 ]

0 голосов
/ 03 августа 2020

Если вы можете передать в домен данных, которые необходимо обработать, вы можете использовать xargs для параллельного запуска вашей программы. xargs позволяет вам выполнять программу параллельно, передавая различные параметры, считанные из stdin. Я успешно использовал его, чтобы заставить bash оболочки работать параллельно.

Посмотрите, поможет ли вам этот вопрос: Python чтение. json файлов из GCS в pandas DF параллельно

0 голосов
/ 03 августа 2020

Вот пример того, как вы можете использовать multiprocessing для работы со строками массива 2d numpy и постоянного вектора.

В этом примере тот же вектор b (эквивалентный ваш x) создается точками с каждой строкой массива a.

import numpy as np
from multiprocessing import Pool


def dot_product(row, vec):
    return (row * vec).sum()

a = np.array([[1, 2, 3],
              [4, 5, 6],
              [7, 8, 9],
              [10, 11, 12]])

b = np.array([10, 11, 12])

p = Pool(3)  # max number of simultaneous processes

print(p.starmap(dot_product, ((row, b) for row in a)))

Обратите внимание, что вы можете передавать только выбираемые объекты в multiprocessing.Pool. Хотя массивы numpy можно выбирать, а функция (например, dot_product здесь) - методы экземпляра - нет. Таким образом, вы не можете использовать свою модель (LinearRegression()) в качестве первого аргумента для Pool.map (или Pool.starmap). Вместо этого вам нужно будет создать экземпляр LinearRegression отдельно внутри функции для каждого процесса.

Собрав все это вместе для вас (хотя, очевидно, у меня недостаточно информации для тестирования), вы получите что-то вроде этого :

def get_data_slope(row, x):
    model = LinearRegression()
    model.fit(x.reshape((-1, 1)), row, 1)
    return model.coef_[0]

p = Pool(3)

data_slopes[:] = p.starmap(get_data_slope, ((row, x) for row in img1_data_r))
0 голосов
/ 03 августа 2020

Вы можете попробовать следующее. Вместо того, чтобы перебирать диапазоны, я бы порекомендовал вам создать функцию, которая принимает 2D-массив и возвращает ожидаемый результат LinearRegression. Затем вы можете создать список, содержащий все 2D-массивы, которые вам нужно перебрать (итератор) -

#Function that works on a single object
def fn(x):
    out = x**3 #your code here
    return out

iterator = [1,2,3,4,5,6,7,8,9,10]  #list of objects that you need to run your function on

pool = mp.Pool(processes=4)  #Number of cores you want to utilize
results = [pool.apply_async(fn, args=(x,)) for x in iterator] #maps the iterator and the function to each core asynchronously
output = [p.get() for p in results]   #collects and returns the results as a list of outputs.
output
[1, 8, 27, 64, 125, 216, 343, 512, 729, 1000]

pool.apply_async должен быть сверхбыстрым вместе с пониманием списка, поскольку он асинхронно передает операции к ядрам, не дожидаясь, пока все ядра завершат sh свои операции перед передачей следующего пакета.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...