Переменная не может быть выбрана ошибка с scheduler = "Процессы" при использовании dask.array из файла netcdf - PullRequest
1 голос
/ 03 апреля 2019

Я пытаюсь использовать dask для обработки трехмерных массивов (x, y, time).Эти массивы хранятся в виде файлов netcdf4 и были написаны с использованием библиотеки python netCDF4.Я могу создать массив dask.array из переменной, определенной в моем файле netcdf, когда я пытаюсь вычислить результат, используя scheduler = "Процессы", возникает следующая ошибка:

NotImplementedError: Variable is not picklable

Я знаю, что параллельная запись не поддерживается при использовании dask с netcdf , однако ошибка возникает при простом вычислении среднего по оси времени. Этот пост , похоже, связан, но не помогает решить мою проблему.Вычисление среднего по оси времени только для демонстрации.На практике я буду применять более сложные функции, которые только частично основаны на numpy, поэтому я хочу обойти глобальную блокировку интерпретатора Python, используя процессы в dask.

import dask.array as da
import netCDF4

path = 'path/to/netcdf_file'
dset = netCDF4.Dataset(path, 'r')
var = dset['var']

x = da.from_array(var, chunks=(500, 500, 2))
dset.close()

result = da.mean(x, axis=2)

# raises NotImplementedError: Variable is not picklable
result.compute(scheduler="processes")

# works just fine
result.compute(scheduler="threads")

Почему .compute(scheduler="processes") вызывает ошибкуи каковы возможные решения?Поскольку у меня много файлов в формате netcdf4, я бы хотел избежать преобразования всех файлов в другой формат.

Я использую Python 2.7 (дистрибутив miniconda) в CentOS 7. Dask v1.1.4 и netCDF4 v1.4.3.2 были установлены из conda-forge.

1 Ответ

1 голос
/ 03 апреля 2019

При использовании многопроцессорного планировщика Dask (например, scheduler="processes"), процесс должен будет обрабатывать открытие и доступ к данным netCDF индивидуально.Я настоятельно рекомендую использовать Xarray для этой задачи, поскольку он имеет встроенную поддержку для работы с netCDF и Dask.Я выписал эквивалентный рабочий процесс, используя Xarray / Dask / netCDF4 ниже:

import dask
import xarray as xr

ds = xr.open_dataset('path/to/netcdf_file',
                     engine='netcdf4',
                     chunks={'x': 500, 'y': 500, 'z': 2})

with dask.config.set(scheduler='processes'):
    result = ds['var'].mean(dim='z').load()

В документации Xarray есть хорошая страница, обсуждающая, как использовать dask в этом контексте.

...