Когда я выполняю по существу те же вычисления с dask для данных zarr и данных паркета, вычисления на основе zarr выполняются значительно быстрее. Почему? Может быть, это из-за того, что я сделал что-то не так, когда создавал файлы паркета?
Я повторил проблему с поддельными данными (см. Ниже) в блокноте Jupyter, чтобы проиллюстрировать то поведение, которое я вижу. Я был бы признателен за понимание того, почему расчет на основе zarr на несколько порядков быстрее, чем расчет на основе паркета.
Данные, с которыми я работаю в реальной жизни, - это данные модели наук о Земле. Конкретные параметры данных не важны, но каждый параметр может рассматриваться как массив с широтой, долготой и временными измерениями.
Чтобы создать zarr-файлы, я просто записываю многомерную структуру моего параметра и его размеры.
Чтобы создать паркет, я сначала «сплющил» массив трехмерных параметров в одномерный массив, который становится одним столбцом в моем фрейме данных. Затем я добавляю столбцы широты, долготы и времени перед записью фрейма данных в виде паркета.
В этой ячейке есть все импорты, необходимые для остальной части кода:
import pandas as pd
import numpy as np
import xarray as xr
import dask
import dask.array as da
import intake
from textwrap import dedent
Эта ячейка генерирует поддельные файлы данных, размер которых составляет чуть более 3 гигабайт:
def build_data(lat_resolution, lon_resolution, ntimes):
"""Build a fake geographical dataset with ntimes time steps and
resolution lat_resolution x lon_resolution"""
lats = np.linspace(-90.0+lat_resolution/2,
90.0-lat_resolution/2,
np.round(180/lat_resolution))
lons = np.linspace(-180.0+lon_resolution/2,
180-lon_resolution/2,
np.round(360/lon_resolution))
times = np.arange(start=1,stop=ntimes+1)
data = np.random.randn(len(lats),len(lons),len(times))
return lats,lons,times,data
def create_zarr_from_data_set(lats,lons,times,data,zarr_dir):
"""Write zarr from a data set corresponding to the data passed in."""
dar = xr.DataArray(data,
dims=('lat','lon','time'),
coords={'lat':lats,'lon':lons,'time':times},
name="data")
ds = xr.Dataset({'data':dar,
'lat':('lat',lats),
'lon':('lon',lons),
'time':('time',times)})
ds.to_zarr(zarr_dir)
def create_parquet_from_data_frame(lats,lons,times,data,parquet_file):
"""Write a parquet file from a dataframe corresponding to the data passed in."""
total_points = len(lats)*len(lons)*len(times)
# Flatten the data array
data_flat = np.reshape(data,(total_points,1))
# use meshgrid to create the corresponding latitude, longitude, and time
# columns
mesh = np.meshgrid(lats,lons,times,indexing='ij')
lats_flat = np.reshape(mesh[0],(total_points,1))
lons_flat = np.reshape(mesh[1],(total_points,1))
times_flat = np.reshape(mesh[2],(total_points,1))
df = pd.DataFrame(data = np.concatenate((lats_flat,
lons_flat,
times_flat,
data_flat),axis=1),
columns = ["lat","lon","time","data"])
df.to_parquet(parquet_file,engine="fastparquet")
def create_fake_data_files():
"""Create zarr and parquet files with fake data"""
zarr_dir = "zarr"
parquet_file = "data.parquet"
lats,lons,times,data = build_data(0.1,0.1,31)
create_zarr_from_data_set(lats,lons,times,data,zarr_dir)
create_parquet_from_data_frame(lats,lons,times,data,parquet_file)
with open("data_catalog.yaml",'w') as f:
catalog_str = dedent("""\
sources:
zarr:
args:
urlpath: "./{}"
description: "data in zarr format"
driver: intake_xarray.xzarr.ZarrSource
metadata: {{}}
parquet:
args:
urlpath: "./{}"
description: "data in parquet format"
driver: parquet
""".format(zarr_dir,parquet_file))
f.write(catalog_str)
##
# Generate the fake data
##
create_fake_data_files()
Я выполнил несколько различных видов вычислений для файлов parquet и zarr, но для простоты в этом примере Я просто вытяну одно значение параметра в определенное время, широту и долготу.
Эта ячейка строит графики ацикли, ориентированные на zarr и parquet c (DAG) для вычисления:
# pick some arbitrary point to pull out of the data
lat_value = -0.05
lon_value = 10.95
time_value = 5
# open the data
cat = intake.open_catalog("data_catalog.yaml")
data_zarr = cat.zarr.to_dask()
data_df = cat.parquet.to_dask()
# build the DAG for getting a single point out of the zarr data
time_subset = data_zarr.where(data_zarr.time==time_value,drop=True)
lat_condition = da.logical_and(time_subset.lat < lat_value + 1e-9, time_subset.lat > lat_value - 1e-9)
lon_condition = da.logical_and(time_subset.lon < lon_value + 1e-9, time_subset.lon > lon_value - 1e-9)
geo_condition = da.logical_and(lat_condition,lon_condition)
zarr_subset = time_subset.where(geo_condition,drop=True)
# build the DAG for getting a single point out of the parquet data
parquet_subset = data_df[(data_df.lat > lat_value - 1e-9) &
(data_df.lat < lat_value + 1e-9) &
(data_df.lon > lon_value - 1e-9) &
(data_df.lon < lon_value + 1e-9) &
(data_df.time == time_value)]
Когда я запускаю время против вычислений для каждого из DAG, я получаю совершенно разные времена. Подмножество на основе zarr занимает менее секунды. Подмножество на основе паркета занимает 15-30 секунд.
В этой ячейке выполняется расчет на основе zarr:
%%time
zarr_point = zarr_subset.compute()
Время расчета на основе Zarr:
CPU times: user 6.19 ms, sys: 5.49 ms, total: 11.7 ms
Wall time: 12.8 ms
В этой ячейке выполняется расчет на основе паркета:
%%time
parquet_point = parquet_subset.compute()
Время расчета на основе паркета:
CPU times: user 18.2 s, sys: 28.1 s, total: 46.2 s
Wall time: 29.3 s
Как видите, расчет на основе zarr выполняется намного, намного быстрее. Почему?