Правильный способ использования функции уменьшения dask для больших наборов данных, хранящихся в виде набора файлов netcdf - PullRequest
0 голосов
/ 06 марта 2020

Давайте рассмотрим большой набор данных, записанный как набор файлов netcdf, например, созданный с помощью следующего кода:

import xarray as xr
import numpy as np
import dask
from scipy.stats import median_absolute_deviation
import os
n_epochs = 200
n_times = 1500
n_series = 10000

def some_process_generating_sequential_data_epochs():
    return np.random.rand(n_series, n_times)

os.mkdir("temp")
# writes 200 files of 120.1 MB each
for no_epoch in range(n_epochs):
    array = xr.DataArray(some_process_generating_sequential_data_epochs()[:, :, None],
                         dims=("series", "times", "epochs"),
                         coords={"series": range(n_series),
                                 "times": range(n_times),
                                 "epochs": [no_epoch]})


    array.chunk({"series": 1000, "times": 500, "epochs":1})\
         .to_netcdf("temp/dummy_file_{}.nc".format(no_epoch)) 

Теперь, если мы хотим применить агрегирующие функции по измерению эпох (т. Е. измерение, которое разделяет набор данных в файлах), мы можем использовать

dataset = xr.open_mfdataset("temp/dummy_file_*.nc", chunks={"series": 1000, "times": 500, "epochs":1}, 
                            concat_dim='epochs', combine='by_coords')

dataset.reduce(np.std, "epochs", allow_lazy=True)\
       .compute().to_netcdf("temp/std.nc")    

Явное упоминание о чанкинге важно, иначе dask загрузит все файлы, которые не помещаются в память [1 ]

По умолчанию будут выбраны чанки для загрузки целых входных файлов в память одновременно.

Это прекрасно работает с numpy mean, std и median функциями. Однако он не работает с другими функциями (например, scipy.stats.median_absolute_deviation), которые пытаются загрузить весь набор данных и запускают MemoryError. Я подозреваю, что это связано с проблемой 5 лет на dask github . В этом билете они сообщают

Проблема возникает, когда у вас большой массив взаимодействует со средним, как показано ниже:

(x - x.mean ()). Sum () .compute ()

Текущее решение состоит в том, чтобы заранее явно вычислить среднее значение

(x - x.mean (). compute ()). sum (). compute ()

Моя попытка применить такую ​​стратегию путем изменения версии median_absolute_deviation scipy.stats оказалась безуспешной. Они также предлагают использовать dask.set_options(split_every=2) (что в настоящее время не рекомендуется в пользу dask.config.set(split_every=2)), но, похоже, здесь это не поможет.

Существует ли надлежащая, без царапин на голове и более или менее всегда действующая идиома для выполнения этих операций сокращения для этого типа набора данных?

1 Ответ

0 голосов
/ 13 марта 2020

Ваш вопрос был довольно длинным, поэтому я не прочитал все это (мои извинения, я пытаюсь ответить на эти вопросы, но их много).

Я думаю, что вы спрашиваете. ..

Как применить функцию произвольного сокращения к большому набору данных?

Ответ таков: вы не можете. Операции типа da.mean переписываются параллельными алгоритмами. Вы не можете просто взять версию Numpy или Scipy и применить ее как есть. Однако, если вы можете немного подумать о том, как разбить операцию сокращения для параллельной работы, вы можете попробовать функцию da.reduction.

В качестве альтернативы, если вы применяете редукцию только по одной оси, вы можете переназначить свои данные так, чтобы эта ось была разбита на отдельные фрагменты, а затем использовать что-то вроде map_blocks, чтобы смущенно применить вашу функцию параллельный путь.

...