Многопроцессорная обработка цикла с добавлением в список в качестве результата - PullRequest
0 голосов
/ 27 июня 2019

Мне нужно распараллелить цикл for.Мой текущий код перебирает список идентификаторов, которые я получаю из набора данных xarray, получает данные строки из набора данных xarray с соответствующим идентификатором, вызывает функцию (вычисляет треугольное распределение данных), добавляет распределение результатов дляФункция в список и после этого преобразует список в набор данных xarray, где каждый результат связан с соответствующим идентификатором, поэтому позже этот набор данных может быть добавлен с помощью идентификатора в основной набор данных.

Мой кусок кода выглядит примерно так:

from sklearn.preprocessing import MinMaxScaler
import xarray as xr
import scipy.stats as st

function call_func(data):
   scaler = MinMaxScaler()
   norm_data = scaler.fit_transform(np.reshape(data, (len(data),1)))
   params = st.triang.fit(norm_data)
   arg,loc,scale = params[:-2],params[-2],params[-1]
   dist = st.triang(loc=loc, scale=scale, *arg)
   return dist

if __name__ == "__main__":
for id in my_dataset['id'].values:
        row_data= my_dataset.sel(id=id)['data'].values[0]
        if len(row_data)>3 and all(row_data== 0) == False:
                result = call_func(row_data)
                result_list.append(result)
        else:
            result_list.append([])

new_dataset = xr.Dataset({'id': my_dataset['id'].values,
                          'dist_data':(['id','dist'],
                           np.reshape(np.array(result_list),(len(result_list),1)))
                           })

Поскольку id_array огромен, я хочу парализовать цикл.Это общий вопрос, однако я новичок в многопроцессорном инструменте.Есть ли у вас рекомендации, как сочетать многопроцессорность с этой задачей?Мои исследования показали, что многопроцессорная обработка и добавление в список - не самая разумная вещь.

1 Ответ

0 голосов
/ 27 июня 2019

Я попытаюсь привести простой фиктивный пример, надеясь, что вы сможете вывести модификацию, необходимую для вашего кода:

вот кодовая версия обычного цикла:

id_array = [*range(10)]

result = []
for id in id_array:
    if id % 2 == 0:
        result.append((id, id))
    else:
        result.append((id, id ** 2))

print(result)

Выход:

[(0, 0), (1, 1), (2, 2), (3, 9), (4, 4), (5, 25), (6, 6), (7, 49 ), (8, 8), (9, 81)]


Здесь, используя ProcessPoolExecutor, я распараллелил его на 4 процесса:

from concurrent.futures import ProcessPoolExecutor

id_array = [*range(10)]


def myfunc(id):
    if id % 2 == 0:
        return id, id
    else:
        return id, id ** 2


result = []
with ProcessPoolExecutor(max_workers=4) as executor:
    for r in executor.map(myfunc, id_array):
        result.append(r)

print(result)

Вывод (тоже самое):

[(0, 0), (1, 1), (2, 2), (3, 9), (4, 4), (5, 25), (6, 6), (7, 49 ), (8, 8), (9, 81)]


в основном:

  1. извлекает содержимое for в функцию, которая возвращает желаемое значение
  2. использовать ProcessPoolExecutor с executor.map(myfunc, id_array)
  3. добавить возвращаемое значение в список результатов.
...