Давайте рассмотрим большой набор данных, записанный как набор файлов netcdf, например, созданный с помощью следующего кода:
import xarray as xr
import numpy as np
import dask
from scipy.stats import median_absolute_deviation
import os
n_epochs = 200
n_times = 1500
n_series = 10000
def some_process_generating_sequential_data_epochs():
return np.random.rand(n_series, n_times)
os.mkdir("temp")
# writes 200 files of 120.1 MB each
for no_epoch in range(n_epochs):
array = xr.DataArray(some_process_generating_sequential_data_epochs()[:, :, None],
dims=("series", "times", "epochs"),
coords={"series": range(n_series),
"times": range(n_times),
"epochs": [no_epoch]})
array.chunk({"series": 1000, "times": 500, "epochs":1})\
.to_netcdf("temp/dummy_file_{}.nc".format(no_epoch))
Теперь, если мы хотим применить агрегирующие функции по измерению эпох (т. Е. измерение, которое разделяет набор данных в файлах), мы можем использовать
dataset = xr.open_mfdataset("temp/dummy_file_*.nc", chunks={"series": 1000, "times": 500, "epochs":1},
concat_dim='epochs', combine='by_coords')
dataset.reduce(np.std, "epochs", allow_lazy=True)\
.compute().to_netcdf("temp/std.nc")
Явное упоминание о чанкинге важно, иначе dask загрузит все файлы, которые не помещаются в память [1 ]
По умолчанию будут выбраны чанки для загрузки целых входных файлов в память одновременно.
Это прекрасно работает с numpy mean, std и median функциями. Однако он не работает с другими функциями (например, scipy.stats.median_absolute_deviation), которые пытаются загрузить весь набор данных и запускают MemoryError
. Я подозреваю, что это связано с проблемой 5 лет на dask github . В этом билете они сообщают
Проблема возникает, когда у вас большой массив взаимодействует со средним, как показано ниже:
(x - x.mean ()). Sum () .compute ()
Текущее решение состоит в том, чтобы заранее явно вычислить среднее значение
(x - x.mean (). compute ()). sum (). compute ()
Моя попытка применить такую стратегию путем изменения версии median_absolute_deviation scipy.stats оказалась безуспешной. Они также предлагают использовать dask.set_options(split_every=2)
(что в настоящее время не рекомендуется в пользу dask.config.set(split_every=2)
), но, похоже, здесь это не поможет.
Существует ли надлежащая, без царапин на голове и более или менее всегда действующая идиома для выполнения этих операций сокращения для этого типа набора данных?