Параллельные задачи над подмножествами массива dask, заключенного в набор данных xarray - PullRequest
2 голосов
/ 13 июля 2020

У меня есть большой массив xarray.Dataset, хранящийся как zarr. Я хочу выполнить с ним некоторые пользовательские операции, которые нельзя сделать, просто используя numpy -подобные функции, с которыми кластер Dask будет работать автоматически. Поэтому я разделяю набор данных на небольшие подмножества и для каждого подмножества отправляю в мой кластер Dask задачу формы

def my_task(zarr_path, subset_index):
    ds = xarray.open_zarr(zarr_path)  # this returns an xarray.Dataset containing a dask.array
    sel = ds.sel(partition_index)
    sel  = sel.load()  # I want to get the data into memory
    # then do my custom operations
    ...

Однако я заметил, что это создает «задачу в задаче»: когда рабочий получает «my_task», он, в свою очередь, отправляет задачи в кластер для загрузки соответствующей части набора данных. Чтобы избежать этого и обеспечить выполнение всей задачи внутри воркера, я отправляю вместо этого задачу:

def my_task_2(zarr_path, subset_index):
    with dask.config.set(scheduler="threading"):
        my_task(zarr_path, subset_index)

Это лучший способ сделать это? Что лучше всего подходит для такой ситуации?

1 Ответ

0 голосов
/ 08 августа 2020

Обычно используются такие методы, как apply_ufunc или map_blocks, чтобы применить функцию параллельно к блокам в наборе данных Xarray.

...