У меня есть большой массив xarray.Dataset, хранящийся как zarr. Я хочу выполнить с ним некоторые пользовательские операции, которые нельзя сделать, просто используя numpy -подобные функции, с которыми кластер Dask будет работать автоматически. Поэтому я разделяю набор данных на небольшие подмножества и для каждого подмножества отправляю в мой кластер Dask задачу формы
def my_task(zarr_path, subset_index):
ds = xarray.open_zarr(zarr_path) # this returns an xarray.Dataset containing a dask.array
sel = ds.sel(partition_index)
sel = sel.load() # I want to get the data into memory
# then do my custom operations
...
Однако я заметил, что это создает «задачу в задаче»: когда рабочий получает «my_task», он, в свою очередь, отправляет задачи в кластер для загрузки соответствующей части набора данных. Чтобы избежать этого и обеспечить выполнение всей задачи внутри воркера, я отправляю вместо этого задачу:
def my_task_2(zarr_path, subset_index):
with dask.config.set(scheduler="threading"):
my_task(zarr_path, subset_index)
Это лучший способ сделать это? Что лучше всего подходит для такой ситуации?