Xarray Dask.delayed slow: как быстро выбрать / интерполировать между двумя наборами данных - PullRequest
0 голосов
/ 24 сентября 2018

У меня есть два набора данных (называемые satdata и atmosdata).Атмосданные равномерно распределены по широте и долготе.Атмосданные имеют размерность (широта: 713, уровень: 37, долгота: 1440, время: 72) и имеют общий размер 12 ГБ.Атмосданные имеют несколько переменных, таких как температура, влажность и т. Д. Влажность имеет форму (время, уровень, широта, долгота).

Сатданные содержат спутниковые наблюдения и имеют размерность (поперек_трека: 90, канал: 3,время: 32195), с 90 * 3 * 32195 = 8692650 точек данных.Across_track означает FOV спутника через положение трека.Сатданные неравномерно распределены по широте / долготе.Например, satdata.latitude имеет измерение (время, канал, поперек_трека), то же самое для satdata.longitude, satdata.sft.

Переменная 'time' в Atmosdata и satdata содержит время в одном и том же дне, но с разными значениями в этих двух наборах данных.Мне нужно найти атмосферные данные (например, влажность и температура), которые имеют ту же широту, долготу и время, что и спутниковые данные.

Чтобы понять это, я перебираю спутниковые данные, чтобы найти местоположение и время каждого наблюдения;затем я нахожу соответствующие атмосферные данные (сначала ближайшая сетка к местоположению спутниковых данных, затем интерполированная к спутниковому времени).Наконец, я объединяю полученные атмосферные данные из всех итераций в один набор данных.

Часть моего кода выглядит следующим образом с использованием небольшого фрагмента данных.

import xarray as xr
import numpy as np
import dask
import pandas as pd

# define a simple satdata dataset
lon = np.array([[18.717, 18.195, 17.68  ], [18.396, 17.87 , 17.351, ]])
lat = np.array([[-71.472, -71.635, -71.802],
   [-71.52 , -71.682, -71.847]])
sft = np.array([[1, 1, 1],
   [1, 1, 1]])
time = np.array(['2010-09-07T00:00:01.000000000', '2010-09-07T00:00:03.000000000'],dtype='datetime64[ns]')
satdata = xr.Dataset({'sft': (['time','across_track'], sft)}, coords = {'longitude': (['time','across_track'], lon), 'latitude': (['time','across_track'], lat), 'time':time })

# atmosdata
atmoslat = np.array([-71.75, -71.5 , -71.25, -71.  , -70.75, -70.5 , -70.25, -70.  , -69.75 ])
atmoslon = np.array([17.25, 17.5 , 17.75, 18.  , 18.25, 18.5 , 18.75, 19.  , 19.25])
atmostime = np.array(['2010-09-07T00:00:00.000000000', '2010-09-07T01:00:00.000000000'],dtype='datetime64[ns]')

atmosq = np.random.rand(2,9,9)
atmosdata = xr.Dataset({'q': (['time', 'latitude', 'longitude'], atmosq)}, coords={'longitude':(['longitude'], atmoslon), 'latitude': (['latitude'], atmoslat), 'time':(['time'], atmostime)})

# do the matching:
matched = dask.compute(match(atmosdata, satdata),scheduler='processes',  num_workers=20)[0]

Функция сопоставления выглядит следующим образом:

@dask.delayed
def match(atmosdata, satdata):
    newatmos = []
    newind = 0
    # iterate over satdata
    for i in np.ndenumerate(satdata.sft):
        if i[1] != np.nan:
           # find one latitude and longitude of satellite data
           temp_lat = satdata.latitude.isel(time=[i[0][0]], across_track=[i[0][1]])
           temp_lon = satdata.longitude.isel(time=[i[0][0]],  across_track=[i[0][1]])
           # find the atmosdata in the grid nearest to this location
           temp_loc  =  atmosdata.sel(latitude =temp_lat.values.ravel()[0], longitude = temp_lon.values.ravel()[0], method='nearest')
           if temp_loc.q.all() > 0:
               # find atmosdata at the satellite time by interpolation
               temp_time = satdata.time.isel(time=[i[0][0]])
               newatmos.append(temp_loc.interp( time = temp_time.data.ravel() ))
               newind += 1

    return xr.concat(newatmos,dim=pd.Index(range(newind), name='NewInd'))

1)Когда я запускаю код, он работает.Но если я не использую небольшой размер данных в коде, вместо этого я использую свои исходные данные (с размерами, указанными выше), тогда я запускаю вычисления и получаю ошибки.

---> 52 matched = dask.compute(match(ecmwfdata, ssmis_land), scheduler='processes', num_workers=20 )
error: 'i' format requires -2147483648 <= number <= 2147483647

2) ЕслиЯ использую наборы данных с другими измерениями, satdata (поперек_трека: 90, канал: 3, время: 100) и атмосферные данные (широта: 71, уровень: 37, долгота: 1440, время: 72), вычисление занимает очень много времени.Я думаю, что мое кодирование не является оптимальным для использования DASK для быстрого решения этой проблемы.

2) Есть ли лучший способ, чем использование цикла for?Цикл for может не использовать DASK для быстрых вычислений?

3) Будет ли хорошей идеей объединить фрагменты в сатдаты, найти предел широты и долготы в блоке сатданных, затем скомпоновать атмосданные в соответствии с этим пределом и, наконец, применить функцию соответствия к каждомукусок сатдаты и атмосфаты?Если это хорошая идея, я пока не знаю, как выполнить итерацию каждого фрагмента satdata вручную ....

4) Функция использует два аргумента, satdata и atmosdata.Поскольку эти два набора данных могут быть довольно большими (12G для atmosdata), вычисления будут выполняться медленнее, чем?

5) В функции, которую я должен был использовать .value в выделении, будет ли это замедлять вычисление, когдаВы используете большие входные данные?

Заранее спасибо!

С уважением

Xiaoni

...