Что я пробовал
У меня смущающая параллель для l oop, в которой я перебираю значения 90x360 в двух вложенных циклах for и выполняю некоторые вычисления. Я попытался dask.delayed
распараллелить циклы for согласно этого урока , хотя он демонстрируется для очень небольшого набора итераций.
Описание проблемы
Я удивлен, обнаружив, что параллельный код занял 2 ч 39 мин по сравнению с непараллельной синхронизацией 1 ч 54 мин , что означает, что я делаю что-то в корне неправильно или, возможно, графики задач слишком велики для обработки?
Информация о настройке
Этот тест был выполнен для подмножества моих итераций, то есть 10 x 360, но оптимизированный код должен обрабатывать 90 x 360 вложенных итераций. Мой мини-кластер имеет 66 ядер и 256 ГБ оперативной памяти, а 2 файла данных по 4 ГБ и <1 ГБ каждый. Я также запутался между подходом <code>multi-processing против multi-threading
для этой задачи. Я думал, что выполнение параллельных циклов в нескольких процессах, подобных реализации по умолчанию joblib
, будет способом go, поскольку каждый l oop работает на независимых точках сетки. Но это говорит о том, что multi-threading
быстрее и должно быть предпочтительным, если у кого-то нет проблемы с GIL (чего у меня нет). Итак, для вышеуказанного времени я использовал dask.delay
параметр планирования по умолчанию, который использует многопоточность для одного процесса.
Упрощенный код
import numpy as np
import pandas as pd
import xarray as xr
from datetime import datetime
from dask import compute, delayed
def add_data_from_small_file(lat):
""" for each grid-point, get time steps from big-file as per mask, and
compute data from small file for those time-steps
Returns: array per latitude which is to be stacked
"""
for lon in range(0,360):
# get time steps from big file
start_time = big_file.time.values[mask1[:, la, lo]]
end_time = big_file.time.values[[mask2[:,la,lo]]
i=0
for t1, t2 in zip(start_time, end_time):
# calculate value from small file for each time pair
temp_var[i] = small_file.sel(t=slice(t1, t2)).median()
i=i+1
temp_per_lon[:, lon] = temp_var
return temp_per_lon
if __name__ == '__main__':
t1 = datetime.now()
small_file = xr.open_dataarray('small_file.nc') # size < 1 GB, 10000x91
big_file = xr.open_dataset('big_file.nc') # size = 4 GB, 10000x91x360
delayed_values = [delayed(add_data_from_small_file)(lat) for lat in range(0,10)] # 10 loops for testing, to scale to 90 loops
# have to delay stacking to avoid memory error
stack_arr = delayed(np.stack)(delayed_values, axis=1)
stack_arr = stack_arr.compute()
print('Total run time:{}'.format(datetime.now()-t1))