Эффективный способ складирования массивов Dask, сгенерированных из Xarray - PullRequest
0 голосов
/ 12 сентября 2018

Итак, я пытаюсь прочитать большое количество относительно больших файлов netCDF, содержащих гидрологические данные. Все файлы NetCDF выглядят так:

<xarray.Dataset>
Dimensions:         (feature_id: 2729077, reference_time: 1, time: 1)
Coordinates:
  * time            (time) datetime64[ns] 1993-01-11T21:00:00
  * reference_time  (reference_time) datetime64[ns] 1993-01-01
  * feature_id      (feature_id) int32 101 179 181 183 185 843 845 847 849 ...
Data variables:
    streamflow      (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
    q_lateral       (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
    velocity        (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
    qSfcLatRunoff   (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
    qBucket         (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
    qBtmVertRunoff  (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
Attributes:
    featureType:                timeSeries
    proj4:                      +proj=longlat +datum=NAD83 +no_defs
    model_initialization_time:  1993-01-01_00:00:00
    station_dimension:          feature_id
    model_output_valid_time:    1993-01-11_21:00:00
    stream_order_output:        1
    cdm_datatype:               Station
    esri_pe_string:             GEOGCS[GCS_North_American_1983,DATUM[D_North_...
    Conventions:                CF-1.6
    model_version:              NWM 1.2
    dev_OVRTSWCRT:              1
    dev_NOAH_TIMESTEP:          3600
    dev_channel_only:           0
    dev_channelBucket_only:     0
    dev:                        dev_ prefix indicates development/internal me...

У меня есть эти данные за 25 лет, и они записываются каждый час. Таким образом, всего около 4 ТБ данных.

Прямо сейчас я просто пытаюсь получить средние значения за день (Ежедневно и Ежемесячно) значений потока. Поэтому я создал следующий скрипт.

import xarray as xr
import dask.array as da
from dask.distributed import Client
import os

workdir = '/path/to/directory/of/files'
files = [os.path.join(workdir, i) for i in os.listdir(workdir)]

client = Client(processes=False, threads_per_worker=4, n_workers=4, memory_limit='750MB')

big_array = []

for i, file in enumerate(files):
    ds = xr.open_dataset(file, chunks={"feature_id": 50000})

    if i == 0:
        print(ds)

    print(ds.streamflow)

    big_array.append(ds.streamflow)

    ds.close()

    if i == 5:
        break

dask_big_array = da.stack(big_array, axis=0)

print(dask_big_array)

Объект ds.streamflow при печати выглядит следующим образом, и, насколько я понимаю, это просто массив Dask:

<xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
  * feature_id  (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
    long_name:    River Flow
    units:        m3 s-1
    coordinates:  latitude longitude
    valid_range:  [       0 50000000]

Странная вещь в том, что когда я складываю массивы, они, похоже, теряют часть, которую я применял к ним ранее. Когда я распечатываю объект big_array, я получаю это:

dask.array<stack, shape=(6, 2729077), dtype=float64, chunksize=(1, 2729077)>

Проблема, с которой я сталкиваюсь, заключается в том, что, когда я пытаюсь запустить этот код, я получаю это предупреждение, а затем я думаю, что память перегружена, поэтому мне нужно убить процесс.

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk...

Итак, я думаю, у меня есть несколько вопросов:

  1. Почему массив dask теряет порцию при штабелировании?
  2. Есть ли более эффективный способ сложить все эти массивы для распараллеливания этого процесса?

Из комментариев это то, что big array:

[<xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
  * feature_id  (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
    long_name:    River Flow
    units:        m3 s-1
    coordinates:  latitude longitude
    valid_range:  [       0 50000000], <xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
  * feature_id  (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
    long_name:    River Flow
    units:        m3 s-1
    coordinates:  latitude longitude
    valid_range:  [       0 50000000], <xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
  * feature_id  (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
    long_name:    River Flow
    units:        m3 s-1
    coordinates:  latitude longitude
    valid_range:  [       0 50000000], <xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
  * feature_id  (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
    long_name:    River Flow
    units:        m3 s-1
    coordinates:  latitude longitude
    valid_range:  [       0 50000000], <xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
  * feature_id  (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
    long_name:    River Flow
    units:        m3 s-1
    coordinates:  latitude longitude
    valid_range:  [       0 50000000], <xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
  * feature_id  (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
    long_name:    River Flow
    units:        m3 s-1
    coordinates:  latitude longitude
    valid_range:  [       0 50000000]]

1 Ответ

0 голосов
/ 13 сентября 2018

Проблема здесь в том, что dask.array.stack() не распознает xarray.DataArray объект как содержащий массивы dask, поэтому вместо этого он преобразует их все в массивы NumPy. Вот как вы исчерпали свою память.

Вы можете исправить это несколькими различными способами:

  1. Вызовите dask.array.stack() для списка массива списка, например, переключите big_array.append(ds.streamflow) на big_array.append(ds.streamflow.data).
  2. Используйте xarray.concat() вместо dask.array.stack(), например, писать dask_big_array = xarray.concat(big_array, dim='time').
  3. Используйте xarray.open_mfdataset(), который сочетает в себе процесс открытия многих файлов и их объединения, например, заменяя всю вашу логику здесь на xarray.open_mfdataset('/path/to/directory/of/files/*').
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...