Параллель Даска на l oop медленнее, чем одноядерный - PullRequest
0 голосов
/ 03 апреля 2020

Что я пробовал

У меня смущающая параллель для l oop, в которой я перебираю значения 90x360 в двух вложенных циклах for и выполняю некоторые вычисления. Я попытался dask.delayed распараллелить циклы for согласно этого урока , хотя он демонстрируется для очень небольшого набора итераций.

Описание проблемы

Я удивлен, обнаружив, что параллельный код занял 2 ч 39 мин по сравнению с непараллельной синхронизацией 1 ч 54 мин , что означает, что я делаю что-то в корне неправильно или, возможно, графики задач слишком велики для обработки?

Информация о настройке

Этот тест был выполнен для подмножества моих итераций, то есть 10 x 360, но оптимизированный код должен обрабатывать 90 x 360 вложенных итераций. Мой мини-кластер имеет 66 ядер и 256 ГБ оперативной памяти, а 2 файла данных по 4 ГБ и <1 ГБ каждый. Я также запутался между подходом <code>multi-processing против multi-threading для этой задачи. Я думал, что выполнение параллельных циклов в нескольких процессах, подобных реализации по умолчанию joblib, будет способом go, поскольку каждый l oop работает на независимых точках сетки. Но это говорит о том, что multi-threading быстрее и должно быть предпочтительным, если у кого-то нет проблемы с GIL (чего у меня нет). Итак, для вышеуказанного времени я использовал dask.delay параметр планирования по умолчанию, который использует многопоточность для одного процесса.

Упрощенный код

import numpy as np
import pandas as pd
import xarray as xr
from datetime import datetime
from dask import compute, delayed

def add_data_from_small_file(lat):
    """ for each grid-point, get time steps from big-file as per mask, and
        compute data from small file for those time-steps
        Returns: array per latitude which is to be stacked
    """

    for lon in range(0,360):
        # get time steps from big file
        start_time = big_file.time.values[mask1[:, la, lo]] 
        end_time = big_file.time.values[[mask2[:,la,lo]]

        i=0  
        for t1, t2 in zip(start_time, end_time):
              # calculate value from small file for each time pair
              temp_var[i] = small_file.sel(t=slice(t1, t2)).median()
              i=i+1

         temp_per_lon[:, lon] = temp_var
     return temp_per_lon



if __name__ == '__main__':
    t1 = datetime.now()
    small_file = xr.open_dataarray('small_file.nc') # size < 1 GB, 10000x91
    big_file = xr.open_dataset('big_file.nc') # size = 4 GB, 10000x91x360

    delayed_values = [delayed(add_data_from_small_file)(lat) for lat in range(0,10)] # 10 loops for testing, to scale to 90 loops
    # have to delay stacking to avoid memory error
    stack_arr = delayed(np.stack)(delayed_values, axis=1) 
    stack_arr = stack_arr.compute()
    print('Total run time:{}'.format(datetime.now()-t1))

1 Ответ

1 голос
/ 04 апреля 2020

Каждая задержанная задача добавляет около 1 мс накладных расходов. Поэтому, если ваша функция медленная (возможно, вы вызываете какую-то другую дорогую функцию), тогда да, dask.delayed может подойти. Если нет, то вам, вероятно, стоит поискать в другом месте.

Если вам интересно, лучше ли вам подходят потоки или процессы, самый простой способ узнать это просто попробовать оба варианта. Это легко сделать.

dask.compute(*values, scheduler="processes")
dask.compute(*values, scheduler="threads")

Может случиться так, что даже если вы используете numpy массивы, большая часть вашего времени фактически тратится на Python для циклов. Если это так, многопоточность здесь вам не поможет, и реальное решение состоит в том, чтобы прекратить использовать Python для циклов, либо будучи умным с numpy / xarray, либо используя проект, подобный Numba.

...