Почему при использовании dask производительность при использовании Zarr намного лучше, чем у паркета? - PullRequest
0 голосов
/ 27 февраля 2020

Когда я выполняю по существу те же вычисления с dask для данных zarr и данных паркета, вычисления на основе zarr выполняются значительно быстрее. Почему? Может быть, это из-за того, что я сделал что-то не так, когда создавал файлы паркета?

Я повторил проблему с поддельными данными (см. Ниже) в блокноте Jupyter, чтобы проиллюстрировать то поведение, которое я вижу. Я был бы признателен за понимание того, почему расчет на основе zarr на несколько порядков быстрее, чем расчет на основе паркета.

Данные, с которыми я работаю в реальной жизни, - это данные модели наук о Земле. Конкретные параметры данных не важны, но каждый параметр может рассматриваться как массив с широтой, долготой и временными измерениями.

Чтобы создать zarr-файлы, я просто записываю многомерную структуру моего параметра и его размеры.

Чтобы создать паркет, я сначала «сплющил» массив трехмерных параметров в одномерный массив, который становится одним столбцом в моем фрейме данных. Затем я добавляю столбцы широты, долготы и времени перед записью фрейма данных в виде паркета.


В этой ячейке есть все импорты, необходимые для остальной части кода:

import pandas as pd
import numpy as np
import xarray as xr
import dask
import dask.array as da
import intake
from textwrap import dedent

Эта ячейка генерирует поддельные файлы данных, размер которых составляет чуть более 3 гигабайт:

def build_data(lat_resolution, lon_resolution, ntimes):
    """Build a fake geographical dataset with ntimes time steps and 
       resolution lat_resolution x lon_resolution"""
    lats = np.linspace(-90.0+lat_resolution/2,
                       90.0-lat_resolution/2,
                       np.round(180/lat_resolution))
    lons = np.linspace(-180.0+lon_resolution/2,
                       180-lon_resolution/2,
                       np.round(360/lon_resolution))
    times = np.arange(start=1,stop=ntimes+1)

    data = np.random.randn(len(lats),len(lons),len(times))
    return lats,lons,times,data

def create_zarr_from_data_set(lats,lons,times,data,zarr_dir):
    """Write zarr from a data set corresponding to the data passed in."""
    dar = xr.DataArray(data,
                       dims=('lat','lon','time'),
                       coords={'lat':lats,'lon':lons,'time':times},
                       name="data")
    ds = xr.Dataset({'data':dar,
                     'lat':('lat',lats),
                     'lon':('lon',lons),
                     'time':('time',times)})
    ds.to_zarr(zarr_dir)

def create_parquet_from_data_frame(lats,lons,times,data,parquet_file):
    """Write a parquet file from a dataframe corresponding to the data passed in."""
    total_points = len(lats)*len(lons)*len(times)

    # Flatten the data array
    data_flat = np.reshape(data,(total_points,1))

    # use meshgrid to create the corresponding latitude, longitude, and time 
    # columns
    mesh = np.meshgrid(lats,lons,times,indexing='ij')
    lats_flat = np.reshape(mesh[0],(total_points,1))
    lons_flat = np.reshape(mesh[1],(total_points,1))
    times_flat = np.reshape(mesh[2],(total_points,1))

    df = pd.DataFrame(data = np.concatenate((lats_flat,
                                             lons_flat,
                                             times_flat, 
                                             data_flat),axis=1), 
                      columns = ["lat","lon","time","data"])
    df.to_parquet(parquet_file,engine="fastparquet")

def create_fake_data_files():
    """Create zarr and parquet files with fake data"""
    zarr_dir = "zarr"
    parquet_file = "data.parquet"

    lats,lons,times,data = build_data(0.1,0.1,31)
    create_zarr_from_data_set(lats,lons,times,data,zarr_dir)
    create_parquet_from_data_frame(lats,lons,times,data,parquet_file)

    with open("data_catalog.yaml",'w') as f:
        catalog_str = dedent("""\
            sources:
              zarr:
                args:
                  urlpath: "./{}"
                description: "data in zarr format"
                driver: intake_xarray.xzarr.ZarrSource
                metadata: {{}}
              parquet:
                args:
                  urlpath: "./{}"
                description: "data in parquet format"
                driver: parquet
        """.format(zarr_dir,parquet_file))
        f.write(catalog_str)


##
# Generate the fake data
##
create_fake_data_files()

Я выполнил несколько различных видов вычислений для файлов parquet и zarr, но для простоты в этом примере Я просто вытяну одно значение параметра в определенное время, широту и долготу.

Эта ячейка строит графики ацикли, ориентированные на zarr и parquet c (DAG) для вычисления:

# pick some arbitrary point to pull out of the data
lat_value = -0.05
lon_value = 10.95
time_value = 5

# open the data
cat = intake.open_catalog("data_catalog.yaml")
data_zarr = cat.zarr.to_dask()
data_df = cat.parquet.to_dask()

# build the DAG for getting a single point out of the zarr data
time_subset = data_zarr.where(data_zarr.time==time_value,drop=True)
lat_condition = da.logical_and(time_subset.lat < lat_value + 1e-9, time_subset.lat > lat_value - 1e-9)
lon_condition = da.logical_and(time_subset.lon < lon_value + 1e-9, time_subset.lon > lon_value - 1e-9)
geo_condition = da.logical_and(lat_condition,lon_condition)
zarr_subset = time_subset.where(geo_condition,drop=True)

# build the DAG for getting a single point out of the parquet data
parquet_subset = data_df[(data_df.lat > lat_value - 1e-9) & 
                         (data_df.lat < lat_value + 1e-9) &
                         (data_df.lon > lon_value - 1e-9) & 
                         (data_df.lon < lon_value + 1e-9) &
                         (data_df.time == time_value)]

Когда я запускаю время против вычислений для каждого из DAG, я получаю совершенно разные времена. Подмножество на основе zarr занимает менее секунды. Подмножество на основе паркета занимает 15-30 секунд.

В этой ячейке выполняется расчет на основе zarr:

%%time
zarr_point = zarr_subset.compute()

Время расчета на основе Zarr:

CPU times: user 6.19 ms, sys: 5.49 ms, total: 11.7 ms
Wall time: 12.8 ms

В этой ячейке выполняется расчет на основе паркета:

%%time
parquet_point = parquet_subset.compute()

Время расчета на основе паркета:

CPU times: user 18.2 s, sys: 28.1 s, total: 46.2 s
Wall time: 29.3 s

Как видите, расчет на основе zarr выполняется намного, намного быстрее. Почему?

1 Ответ

1 голос
/ 27 февраля 2020

Рад видеть, что fastparquet, zarr и intake используются в одном и том же вопросе!

TL; DR здесь: используйте правильную модель данных, соответствующую вашей задаче.

Также стоит отметить, что набор данных zarr имеет размер 1,5 ГБ, blosc / lz4 сжат в 512 чанках, а набор данных паркета - 1,8 ГБ, snappy сжатый в 5 чанках, где оба значения сжатия являются значениями по умолчанию. Случайные данные плохо сжимаются, координаты -.

zarr - это формат, ориентированный на массив, и его можно разбить на любое измерение, что означает, что для чтения одной точки вам нужны только метаданные ( это очень краткий текст) и тот, который содержит его - который в этом случае должен быть распакован. Индексирование блоков данных неявно.

parquet - это формат, ориентированный на столбцы. Чтобы найти указанную c точку, вы можете игнорировать некоторые чанки, основанные на метаданных столбца min / max для каждого чанка, в зависимости от того, как организованы столбцы координат, а затем загрузить чанк столбца для случайных данных и распаковать , Вам потребуется пользовательский лог c, чтобы иметь возможность выбирать чанки для загрузки в несколько столбцов одновременно, что в настоящее время в Dask не реализовано (и было бы невозможно без тщательного переупорядочения данных). Метаданные для паркета намного больше, чем для zarr, но оба незначительны в этом случае - если у вас много переменных или больше координат, это может стать дополнительной проблемой для паркета.

В этом случае произвольный доступ будет намного быстрее для zarr, но чтение всех данных не радикально отличается, так как оба должны загружать все байты на dis c и распаковывать их в числа с плавающей запятой, и в обоих случаях данные координат загружаются быстро. Однако представление в памяти несжатого фрейма данных намного больше, чем для несжатого массива, поскольку вместо одномерного небольшого массива для каждой координаты теперь у вас есть массивы для каждой координаты с тем же числом точек, что и для случайных данных; плюс, опять же, найти конкретную точку можно путем индексации небольших массивов, чтобы получить правильную координату в случае массива, и путем сравнения с каждым значением в широте / долготе каждой отдельной точки для случая блока данных.

...