Я хочу выполнить N = 1000 начальной загрузки с заменой на сеточные данные.Одно вычисление занимает около 0,5 с.У меня есть доступ к суперкомпьютерному узлу с 48 ядрами.Поскольку повторная выборка не зависит друг от друга, я наивно надеюсь распределить рабочую нагрузку по всем или по крайней мере многим ядрам и получить увеличение производительности на 0,8 * ncores.Но я не понимаю.
Мне все еще не хватает правильного понимания о сумраке.Основываясь на рекомендациях по настройке количества рабочих, работающих на рабочих местах , я использую:
from dask.distributed import Client
client = Client(processes=False, threads_per_worker=8, n_workers=6, memory_limit=‘32GB')
Я также пытался использовать SLURMCluster, но, думаю, сначала мне нужно понять, что я делаю, а затем масштабировать.
Мой MWE:
- создать пример данных
- функция записи Я хочу применить
- функция записи в элементах ресэмплинга
- записьфункция начальной загрузки с начальной загрузкой (= N) в качестве аргумента: см. множество реализаций ниже
- выполнить загрузку
import dask
import numpy as np
import xarray as xr
from dask.distributed import Client
inits = np.arange(50)
lats = np.arange(96)
lons = np.arange(192)
data = np.random.rand(len(inits), len(lats), len(lons))
a = xr.DataArray(data,
coords=[inits, lats, lons],
dims=['init', 'lat', 'lon'])
data = np.random.rand(len(inits), len(lats), len(lons))
b = xr.DataArray(data,
coords=[inits, lats, lons],
dims=['init', 'lat', 'lon'])
def func(a,b, dim='init'):
return (a-b).std(dim)
bootstrap=96
def resample(a):
smp_init = np.random.choice(inits, len(inits))
smp_a = a.sel(init=smp_init)
smp_a['init'] = inits
return smp_a
# serial function
def bootstrap_func(bootstrap=bootstrap):
res = (func(resample(a),b) for _ in range(bootstrap))
res = xr.concat(res,'bootstrap')
# leave out quantile because not issue here yet
#res_ci = res.quantile([.05,.95],'bootstrap')
return res
@dask.delayed
def bootstrap_func_delayed_decorator(bootstrap=bootstrap):
return bootstrap_func(bootstrap=bootstrap)
def bootstrap_func_delayed(bootstrap=bootstrap):
res = (dask.delayed(func)(resample(a),b) for _ in range(bootstrap))
res = xr.concat(dask.compute(*res),'bootstrap')
#res_ci = res.quantile([.05,.95],'bootstrap')
return res
for scheduler in ['synchronous','distributed','multiprocessing','processes','single-threaded','threads']:
print('scheduler:',scheduler)
def bootstrap_func_delayed_processes(bootstrap=bootstrap):
res = (dask.delayed(func)(resample(a),b) for _ in range(bootstrap))
res = xr.concat(dask.compute(*res, scheduler=scheduler),'bootstrap')
res = res.quantile([.05,.95],'bootstrap')
return res
%time c = bootstrap_func_delayed_processes()
Следующие результаты получены на моем 4-ядерном ноутбуке.Но на суперкомпьютере я тоже не вижу ускорения, скорее уменьшу на 50%.
Результаты для серийного:
%time c = bootstrap_func()
CPU times: user 814 ms, sys: 58.7 ms, total: 872 ms
Wall time: 862 ms
Результаты для параллельного:
%time c = bootstrap_func_delayed_decorator().compute()
CPU times: user 96.2 ms, sys: 50 ms, total: 146 ms
Wall time: 906 ms
Результаты дляраспараллелено из цикла:
scheduler: synchronous
CPU times: user 2.57 s, sys: 330 ms, total: 2.9 s
Wall time: 2.95 s
scheduler: distributed
CPU times: user 4.51 s, sys: 2.74 s, total: 7.25 s
Wall time: 8.86 s
scheduler: multiprocessing
CPU times: user 4.18 s, sys: 2.53 s, total: 6.71 s
Wall time: 7.95 s
scheduler: processes
CPU times: user 3.97 s, sys: 2.1 s, total: 6.07 s
Wall time: 7.39 s
scheduler: single-threaded
CPU times: user 2.26 s, sys: 275 ms, total: 2.54 s
Wall time: 2.47 s
scheduler: threads
CPU times: user 2.84 s, sys: 341 ms, total: 3.18 s
Wall time: 2.66 s
Ожидаемые результаты: - ускорение (на 0,8 * ncores)
Другие соображения: - Я также проверил, должен ли я собирать данные в свои блоки.слишком образец кусков.Массивы по кусочкам занимают больше времени.
Мои вопросы: - Что я ошибся в распараллеливании?- Не так ли полезна настройка клиента?- Я реализовал dask.delayed не достаточно умным?- Моя последовательная функция уже выполняется параллельно из-за dask?Я думаю нет.