Распределенный Xarray не удалось сериализовать - PullRequest
1 голос
/ 19 июня 2020

Мне нужно повысить дискретизацию с помощью линейной интерполяции некоторых спутниковых изображений, организованных в DataArray. Пока я не запустил код локально, у меня нет проблем, но если я попытаюсь воспроизвести интерполяцию в распределенной системе, я верну эту ошибку:

`Could not serialize object of type tuple`

, чтобы воспроизвести проблему, необходимо переключиться между распределенный или локальный env. вот распространенная версия кода.

n_time = 365
px = 2000 
lat = np.linspace(19., 4., px)
lon = np.linspace(34., 53., px)
time = pd.date_range('1/1/2019', periods=n_time, freq='D')
data = xr.DataArray(np.random.random((n_time, px, px)), dims=('time', 'lat', 
'lon'),coords={'time': time, 'lat': lat, 'lon': lon})
data = data.chunk({'time':1})

#upsampling 
nlat = np.linspace(19., 4., px*2)
nlon = np.linspace(34., 53., px*2)
interp = data.interp(lat=nlat, lon=nlon)
computed = interp.compute()

Есть ли у кого-нибудь идеи и идеи о том, как обойти проблему?

РЕДАКТИРОВАТЬ 1:

Кажется, что я не Это было недостаточно ясно в моем первом MRE, поэтому я решил переписать все данные, полученные до сих пор. Мне нужно увеличить размер набора спутниковых данных с 500 до 250 метров. Конечная цель состоит в том, что, поскольку разбиение по измеряемому измерению еще не поддерживается **, выясните, как я могу создать обходной путь и повысить дискретизацию каждого изображения до 500 наборов данных.

px = 2000
n_time = 365
time = pd.date_range('1/1/2019', periods=n_time, freq='D')

# dataset to be upsampled
lat_500 = np.linspace(19., 4., px)
lon_500 = np.linspace(34., 53., px)
da_500 = xr.DataArray(dsa.random.random((n_time, px, px),
                      chunks=(1, 1000, 1000)),
                      dims=('time', 'lat', 'lon'),
                      coords={'time': time, 'lat': lat_500, 'lon': lon_500})

# reference dataset
lat_250 = np.linspace(19., 4., px * 2)
lon_250 = np.linspace(34., 53., px * 2)
da_250 = xr.DataArray(dsa.random.random((n_time, px * 2, px * 2),
                      chunks=(1, 1000, 1000)),
                      dims=('time', 'lat', 'lon'),
                      coords={'time': time, 'lat': lat_250, 'lon': lon_250})

# upsampling
da_250i = da_500.interp(lat=lat_250, lon=lon_250)

#fake index
fNDVI = (da_250i-da_250)/(da_250i+da_250)

fNDVI.to_netcdf(r'c:\temp\output.nc').compute()

проблема, и избегайте воздействия на память, как предлагает Райан. В любом случае два набора данных могут быть выгружены на диск, а затем перезагружены.

** примечание: кажется, что что-то движется для реализации интерполяции вместе с фрагментированным набором данных, но все еще не полностью доступно. Вот подробности https://github.com/pydata/xarray/pull/4155

Ответы [ 2 ]

1 голос
/ 23 июня 2020

В ответ на ваш отредактированный вопрос у меня есть новое решение.

Чтобы выполнить интерполяцию по размерам широты и долготы, вам нужно повторно разбить данные. Я добавил эту строку перед этапом интерполяции.

da_500 = da_500.chunk({'lat': -1, 'lon': -1})

После этого вычисления были выполнены для меня без ошибок в распределенном режиме.

from dask.distributed import Client
client = Client()
fNDVI.to_netcdf(r'~/tmp/test.nc').compute()

Я заметил, что вычисления были скорее памятью интенсивный. Я рекомендую следить за панелью инструментов dask, чтобы увидеть, не заканчивается ли у вас память.

1 голос
/ 23 июня 2020

Я считаю, что есть две вещи, которые вызывают ошибку в этом примере sh, обе, вероятно, связаны с использованием памяти

  • Вы заполняете исходный набор данных большим numpy массивом (np.random.random((n_time, px, px)), а затем позвоните по номеру .chunk по факту Это вынуждает Dask передавать на своих графиках большой объект. Решение : используйте метод отложенной загрузки.
  • Вашему объекту interp требуется 47 ГБ памяти. Это слишком много для большинства компьютеров. Решение : добавьте шаг сокращения перед вызовом compute. Это позволяет вам проверить, правильно ли работает ваша интерполяция, не загружая одновременно все результаты в ОЗУ.

С этими изменениями код выглядит следующим образом

import numpy as np
import dask.array as dsa
import pandas as pd
import xarray as xr

n_time = 365
px = 2000 
lat = np.linspace(19., 4., px)
lon = np.linspace(34., 53., px)
time = pd.date_range('1/1/2019', periods=n_time, freq='D')

# use dask to lazily create the random data, not numpy
# this avoids populating the dask graph with large objects
data = xr.DataArray(dsa.random.random((n_time, px, px),
                    chunks=(1, px, px)),
                    dims=('time', 'lat', 'lon'),
                    coords={'time': time, 'lat': lat, 'lon': lon})

# upsampling 
nlat = np.linspace(19., 4., px*2)
nlon = np.linspace(34., 53., px*2)
# this object requires 47 GB of memory
# computing it directly is not an option on most computers
interp = data.interp(lat=nlat, lon=nlon)

# instead, we reduce in the time dimension before computing
interp.mean(dim='time').compute()

Это было выполнено в несколько минут на моем ноутбуке.

...