У меня есть набор огромных наборов геопространственных данных (в основном GeoTIFF, но могут быть и другие форматы), которые я хочу обрабатывать последовательно пространственно windows с использованием dask-распределенных по кластеру. Из-за проблем с компиляцией я должен использовать привязки GDAL python (не rasterio, поэтому не могу использовать xarray), и я хотел бы сопоставить эти файлы с разбитым массивом dask. Следующее работает нормально:
g = gdal.Open("superduperfile.tif")
da = da.from_array(g.ReadAsArray(), chunks=(n_bands, 256, 256))
Однако он загружает весь набор данных в память. Идеальным вариантом было бы ленивое чтение: геотиф уже разбит на части, поэтому если я могу написать функцию чтения, то могу создавать из отложенного. Вот моя попытка:
import numpy as np
import gdal
import dask
import dask.array as da
@dask.delayed
def read_chunk(fname, xoff, yoff, nx, ny):
g = gdal.Open(fname)
return g.ReadAsArray(xoff=xoff, yoff=yoff, xsize=nx, ysize=ny)
# Not the most elegant code!!!!
def build_data_array(fname):
g = gdal.Open(fname)
nt = g.RasterCount
Nx = g.RasterXSize
Ny = g.RasterYSize
g = None
arr = []
# get_chunks calculates pixel windows using a sensible blocksize
for xoff, yoff, nx, ny, nume in get_chunks(Nx, Ny):
if yoff == 0:
if xoff != 0:
tmp = [da.from_delayed(x, dtype=np.float32, shape=(n, nny, nnx)) for x, nnx, nny in line]
arr.append(da.hstack(tmp))
line = [np.array([read_chunk(fname, xoff, yoff, nx, ny), nx, ny])]
else:
line.append(np.array([read_chunk(fname, xoff, yoff, nx, ny), nx, ny]))
tmp = [da.from_delayed(x, dtype=np.float32, shape=(nt, nny, nnx)) for x, nnx, nny in line]
arr.append(da.hstack(tmp))
return da.concatenate(arr, axis=2)
dx = build_data_array("superduper.tif")
dx[dx<0] = np.nan
dx = dx/10000.
Очевидно, это работает, и большинство операций дают результаты, идентичные чтению в памяти. Извлечение одного массива через подмножество (dx[:, 822, 722].compute()
например) выполняется немедленно. Тем не менее, da.apply_along_axis
занимает много времени (много раз передавая dx[:, 822, 722].compute()
в мою функцию напрямую.
solution = da.apply_along_axis(function, 0, dx,
dtype=np.float32, shape=(20,))
print(solution[:, 822, 722])
Мне удалось улучшить это, переделав dx
на nt, 64, 64
, но я Я озадачен, почему мне нужно перегруппировать. Массив в памяти, который я описал вверху (с фрагментами nt,256,256
), работает так же быстро, как отложенный перегруппированный массив