как загружать и обрабатывать zarr файлы, используя dask и xarray - PullRequest
1 голос
/ 17 апреля 2020

У меня есть ежемесячные файлы zarr в s3, которые содержат данные о температуре в сетке. Я хотел бы собрать данные за несколько месяцев для одного широты и долготы и создать кадр данных этого временного ряда. Некоторый псевдокод:

datasets=[]
for file in files:
    s3 = s3fs.S3FileSystem()        
    zarr_store = s3fs.S3Map(file, s3=s3)
    zarr = xr.open_zarr(store=zarr_store, consolidated=True)
    ds = zarr.sel(latitude=lat,
                  longitude=long,
                  time=slice(start_date.strftime("%Y-%m-%d"),
                             end_date.strftime("%Y-%m-%d"))
                       )
    datasets.append(ds)

con = xr.concat(datasets, dim='time')
df = con.to_dataframe()

, поэтому этот код будет работать, но невероятно медленный. Я надеялся использовать Dask, чтобы ускорить это. Мой план состоял в том, чтобы изменить метод, чтобы обрабатывать один файл за раз и возвращать фрейм данных. Затем я вызову client.map () и сгенерирую все dfs, а затем объединю их в конце. Поэтому я обнаружил нечто похожее на это:

def load(file, lat: float, long: float, start_date, end_date):

    s3 = s3fs.S3FileSystem()
    s3_path = file['s3_bucket'] + '/' + file['zarr_s3_key']
    zarr_store = s3fs.S3Map(s3_path, s3=s3)
    zarr = xr.open_zarr(store=zarr_store, consolidated=True)

    ds = zarr.sel(latitude=lat,
                  longitude=long,
                  time=slice(start_date.strftime("%Y-%m-%d"),
                             end_date.strftime("%Y-%m-%d"))
                 )

    tmp = x.result().to_array().values
    df_time = zarr.coords['time'].sel(time=slice(start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))).values
    df = pd.DataFrame({'time': df_time, 'lat': lat, 'long': long, 'dat': tmp})
    df.set_index(['time', 'lat', 'long'], inplace=True)

    return df

if __name__ == '__main__':
    client = Client('tcp://xxx')

    start_date = date(2000, 1, 7)
    end_date = date(2000, 10, 20)
    lat = 2
    lon = 10

    # get the s3 locations of the zarr files from the db
    files = get_files()

    # try just running with one file
    res = client.submit(load, files[0], lat, lon, start_date, end_date) 

    # run them all
    future = client.map(load, files,
                        repeat(lat), repeat(lon),
                        repeat(start_date), repeat(end_date))
    x = client.gather(future)

Этот код работает нормально, когда я подключаю клиента только к моей локальной машине. Но когда я пытаюсь подключиться к удаленному кластеру, я получаю следующую ошибку при вызове xr.open_zarr:

KeyError: 'XXX / data.zarr / .zmetadata'

Я попытался изменить код и загрузка zarrs вне вызова метода и передача их внутрь, но в результате я получил только nans. Есть ли что-то, что мне не хватает? Разве это не правильный способ решить то, что я пытаюсь сделать?

Ответы [ 2 ]

4 голосов
/ 21 апреля 2020

Если вы хотите просто извлечь временной ряд в точке, вы можете просто создать клиента Dask и затем позволить xarray параллельно выполнять магию c. В приведенном ниже примере у нас есть только один набор данных zarr, но пока рабочие заняты обработкой фрагментов в каждом файле Zarr, вы ничего не получите от параллельного анализа файлов Zarr.

import xarray as xr
import fsspec
import hvplot.xarray

from dask.distributed import Client

url = 's3://mur-sst/zarr'  # Amazon Public Data

ds = xr.open_zarr(fsspec.get_mapper(url, anon=True), consolidated=True)

timeseries = ds['analysed_sst'].sel(time=slice('2015-01-01','2020-01-01'),
                                    lat=43, 
                                    lon=-70).persist()

timeseries.hvplot()

производит:

enter image description here

Здесь находится Full Jupyter Notebook

0 голосов
/ 17 апреля 2020

Был в состоянии решить эту проблему, но оставит ее на тот случай, если в будущем это поможет кому-либо еще.

Так что это оказалось проблемой с разрешениями. У рабочих не было доступа к корзине S3, поэтому я и получил KeyError.

Я по-прежнему открыт для общения с людьми, если это лучший способ массовой загрузки / обработки zarrs, хотя

...