Мне нужно сохранить массивы dask в hdf5 при использовании распределенного dask. Моя ситуация очень похожа на описанную в этом выпуске: https://github.com/dask/dask/issues/3351. В основном этот код будет работать:
import dask.array as da
from distributed import Client
import h5py
from dask.utils import SerializableLock
def create_and_store_dask_array():
data = da.random.normal(10, 0.1, size=(1000, 1000), chunks=(100, 100))
data.to_hdf5('test.h5', '/test')
# this fails too
# f = h5py.File('test.h5', 'w')
# dset = f.create_dataset('/matrix', shape=data.shape)
# da.store(data, dset) #
# f.close()
create_and_store_dask_array()
Но как только я попытаюсь задействовать распределенный планировщик, я получаю TypeError: не могу pickle _thread._local objects.
import dask.array as da
from distributed import Client
import h5py
from dask.utils import SerializableLock
from dask.distributed import Client, LocalCluster,progress,performance_report
def create_and_store_dask_array():
data = da.random.normal(10, 0.1, size=(1000, 1000), chunks=(100, 100))
data.to_hdf5('test.h5', '/test')
# this fails too
# f = h5py.File('test.h5', 'w')
# dset = f.create_dataset('/matrix', shape=data.shape)
# da.store(data, dset) #
# f.close()
cluster = LocalCluster(n_workers=35,threads_per_worker=1)
client =Client(cluster)
create_and_store_dask_array()
В настоящее время я работаю над этим, отправляя свои вычисления в планировщик небольшими частями, собирая результаты в памяти и сохраняя массивы с помощью h5py, но это очень и очень медленный. Может ли кто-нибудь предложить хороший способ решения этой проблемы? Обсуждение проблемы подразумевает, что xarray может взять массив dask и записать его в файл hdf5, хотя это кажется очень медленным.
import xarray as xr
import netCDF4
import dask.array as da
from distributed import Client
import h5py
from dask.utils import SerializableLock
cluster = LocalCluster(n_workers=35,threads_per_worker=1)
client =Client(cluster)
data = da.random.normal(10, 0.1, size=(1000, 1000), chunks=(100, 100))
#data.to_hdf5('test.h5', '/test')
test = xr.DataArray(data,dims=None,coords=None)
#save as hdf5
test.to_netcdf("test.h5",mode='w',format="NETCDF4")
Если кто-нибудь может предложить способ решения этой проблемы, меня очень интересует найти решение (особенно такое, которое не требует добавления дополнительных зависимостей)
Заранее спасибо,