Я запускаю симуляции с использованием dask.distributed. Моя модель определена в отложенной функции, и я складываю несколько реализаций.
Упрощенная версия того, что я делаю, приведена в следующем фрагменте кода:
import numpy as np
import xarray as xr
import dask.array as da
import dask
from dask.distributed import Client
from itertools import repeat
@dask.delayed
def run_model(n_time,a,b):
result = np.array([a*np.random.randn(n_time)+b])
return result
client = Client()
# Parameters
n_sims = 10000
n_time = 100
a_vals = np.random.randn(n_sims)
b_vals = np.random.randn(n_sims)
output_file = 'out.nc'
# Run simulations
out = da.stack([da.from_delayed(run_model(n_time,a,b),(1,n_time,),np.float64) for a,b in zip(a_vals, b_vals)])
# Store output in a dataframe
ds = xr.Dataset({'var1': (['realization', 'time'], out[:,0,:])},
coords={'realization': np.arange(n_sims),
'time': np.arange(n_time)*.1})
# Save to a netcdf file -> at this point, computations will be carried out
ds.to_netcdf(output_file)
Если я хочу запустить много симуляций, я получаю следующее предупреждение:
/home/user/miniconda3/lib/python3.6/site-packages/distributed/worker.py:840: UserWarning: Large object of size 2.73 MB detected in task graph:
("('getitem-32103d4a23823ad4f97dcb3faed7cf07', 0, ... cd39>]), False)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s))
Насколько я понимаю (из этого и этого вопроса), метод, предложенный предупреждением, помогает в получении больших данных в функцию. Однако все мои входные данные являются скалярными значениями, поэтому они не должны занимать почти 3 МБ памяти. Даже если функция run_model()
вообще не принимает никаких аргументов (поэтому параметры не передаются), я получаю то же предупреждение.
Я также взглянул на график задач, чтобы увидеть, есть ли какой-то шаг, требующий загрузки большого количества данных. Для трех реализаций это выглядит так:
Так что мне кажется, что каждая реализация обрабатывается отдельно, что должно поддерживать низкий объем данных.
Я хотел бы понять, что на самом деле представляет собой шаг, который производит большой объект, и что мне нужно сделать, чтобы разбить его на более мелкие части.