Создание отложенного массива dask путем чтения чанков - PullRequest
0 голосов
/ 06 мая 2020

У меня есть набор огромных наборов геопространственных данных (в основном GeoTIFF, но могут быть и другие форматы), которые я хочу обрабатывать последовательно пространственно windows с использованием dask-распределенных по кластеру. Из-за проблем с компиляцией я должен использовать привязки GDAL python (не rasterio, поэтому не могу использовать xarray), и я хотел бы сопоставить эти файлы с разбитым массивом dask. Следующее работает нормально:

g = gdal.Open("superduperfile.tif")
da = da.from_array(g.ReadAsArray(), chunks=(n_bands, 256, 256))

Однако он загружает весь набор данных в память. Идеальным вариантом было бы ленивое чтение: геотиф уже разбит на части, поэтому если я могу написать функцию чтения, то могу создавать из отложенного. Вот моя попытка:

import numpy as np
import gdal
import dask
import dask.array as da


@dask.delayed            
def read_chunk(fname, xoff, yoff, nx, ny):
    g = gdal.Open(fname)
    return g.ReadAsArray(xoff=xoff, yoff=yoff, xsize=nx, ysize=ny)

# Not the most elegant code!!!!
def build_data_array(fname):
    g = gdal.Open(fname)
    nt = g.RasterCount
    Nx = g.RasterXSize
    Ny = g.RasterYSize

    g = None
    arr = []
    # get_chunks calculates pixel windows using a sensible blocksize
    for xoff, yoff, nx, ny, nume in get_chunks(Nx, Ny):
        if yoff == 0:
            if xoff != 0:
                tmp = [da.from_delayed(x, dtype=np.float32, shape=(n, nny, nnx)) for x, nnx, nny in line]
                arr.append(da.hstack(tmp))

            line = [np.array([read_chunk(fname, xoff, yoff, nx, ny), nx, ny])]
        else:
            line.append(np.array([read_chunk(fname, xoff, yoff, nx, ny), nx, ny]))
    tmp = [da.from_delayed(x, dtype=np.float32, shape=(nt, nny, nnx)) for x, nnx, nny in line]
    arr.append(da.hstack(tmp))

    return da.concatenate(arr, axis=2)

dx = build_data_array("superduper.tif")
dx[dx<0] = np.nan
dx = dx/10000.

Очевидно, это работает, и большинство операций дают результаты, идентичные чтению в памяти. Извлечение одного массива через подмножество (dx[:, 822, 722].compute() например) выполняется немедленно. Тем не менее, da.apply_along_axis занимает много времени (много раз передавая dx[:, 822, 722].compute() в мою функцию напрямую.

solution = da.apply_along_axis(function, 0, dx,
                          dtype=np.float32, shape=(20,))
print(solution[:, 822, 722])

Мне удалось улучшить это, переделав dx на nt, 64, 64, но я Я озадачен, почему мне нужно перегруппировать. Массив в памяти, который я описал вверху (с фрагментами nt,256,256), работает так же быстро, как отложенный перегруппированный массив

...