Сохранить массив Dask размером больше памяти в файл hdf5 - PullRequest
0 голосов
/ 14 июля 2020

Мне нужно сохранить массивы dask в hdf5 при использовании распределенного dask. Моя ситуация очень похожа на описанную в этом выпуске: https://github.com/dask/dask/issues/3351. В основном этот код будет работать:

import dask.array as da
from distributed import Client
import h5py
from dask.utils import SerializableLock


def create_and_store_dask_array():
    data = da.random.normal(10, 0.1, size=(1000, 1000), chunks=(100, 100))

    data.to_hdf5('test.h5', '/test')
    # this fails too
    # f = h5py.File('test.h5', 'w')
    # dset = f.create_dataset('/matrix', shape=data.shape)
    # da.store(data, dset) #
    # f.close()



create_and_store_dask_array()

Но как только я попытаюсь задействовать распределенный планировщик, я получаю TypeError: не могу pickle _thread._local objects.

import dask.array as da
from distributed import Client
import h5py
from dask.utils import SerializableLock
from dask.distributed import Client, LocalCluster,progress,performance_report


def create_and_store_dask_array():
    data = da.random.normal(10, 0.1, size=(1000, 1000), chunks=(100, 100))

    data.to_hdf5('test.h5', '/test')
    # this fails too
    # f = h5py.File('test.h5', 'w')
    # dset = f.create_dataset('/matrix', shape=data.shape)
    # da.store(data, dset) #
    # f.close()


cluster = LocalCluster(n_workers=35,threads_per_worker=1)
client =Client(cluster)
create_and_store_dask_array()

В настоящее время я работаю над этим, отправляя свои вычисления в планировщик небольшими частями, собирая результаты в памяти и сохраняя массивы с помощью h5py, но это очень и очень медленный. Может ли кто-нибудь предложить хороший способ решения этой проблемы? Обсуждение проблемы подразумевает, что xarray может взять массив dask и записать его в файл hdf5, хотя это кажется очень медленным.

import xarray as xr
import netCDF4
import dask.array as da
from distributed import Client
import h5py
from dask.utils import SerializableLock
cluster = LocalCluster(n_workers=35,threads_per_worker=1)
client =Client(cluster)
data = da.random.normal(10, 0.1, size=(1000, 1000), chunks=(100, 100))
#data.to_hdf5('test.h5', '/test')
test = xr.DataArray(data,dims=None,coords=None)
#save as hdf5
test.to_netcdf("test.h5",mode='w',format="NETCDF4")

Если кто-нибудь может предложить способ решения этой проблемы, меня очень интересует найти решение (особенно такое, которое не требует добавления дополнительных зависимостей)

Заранее спасибо,

1 Ответ

1 голос
/ 08 августа 2020

Объекты H5Py не сериализуемы, поэтому их сложно перемещать между различными процессами в распределенном контексте. Явный метод to_hdf5 помогает обойти это. Более общий метод store не относится к HDF5 таким же образом.

...