У меня был опыт использования Spark в прошлом, и, честно говоря, исходя из опыта, в основном, python, это был довольно большой скачок. Когда я увидел dask , я подумал, что это будет намного лучшее решение для распределенных вычислений, однако, похоже, я столкнулся с проблемой простого чтения паркетных файлов.
У меня s3 ведро, скажем s3://some-bucket/
. И в этом у меня есть мои паркетные файлы, сохраненные как разделение улья, как показано ниже.
├── year=2019
└── month=01
└── month=01
└── ....
└── year=2020
└── month=01
└── day=01
├── datasource=browser
└── ...
└── datasource=iphone
└── part2.parquet
└── part2.parquet
└── ...
Нет файла _metadata или _common_metadata .
Чтобы продемонстрировать, сколько более медленное чтение это в dask
по сравнению с Spark
Я могу привести пример. В Spark это занимает ~ 90 секунд:
df = spark.read.parquet("s3://some-bucket/year=2020/month=04/")
df.createOrReplaceTempView("df")
res = spark.sql("SELECT column1, SUM(column2) FROM df GROUP BY column1")
res = res.toPandas() # convert to pandas to be fair to dask, but we know this isn't fast in spark
То же самое в dask занимает ~ 800 секунд:
cluster = YarnCluster()
# Connect to the cluster
client = Client(cluster)
cluster.scale(10)
df = dd.read_parquet("s3://some-bucket/year=2020/month=04/*/*/*.parquet",
columns=["column1", "column2"],
engine="pyarrow",
gather_statistics=False,
)
res = df.groupby('column1').column2.sum().compute()
Использование "s3://some-bucket/year=2020/month=04/"
вместо "s3://some-bucket/year=2020/month=04/*/*/*.parquet"
требует ~ 2100 секунд.
Одна вещь, которую я уже пробовал, - это чтение метаданных только одного из маленьких кусочков паркета, извлечение схемы pyarrow и передача ее как kwarg вместе с validate_schema=False
. Примерно так:
import pyarrow.parquet as pq
import s3fs
s3 = s3fs.S3FileSystem()
# we pick a small part of the dataset
ds = pq.ParquetDataset(
"s3://some-bucket/year=2020/month=04/day=01/datasource=iphone/part1.parquet",
filesystem=s3)
# read in the table and write it out to a temp parquet
tbl = ds.read()
pq.write_table(tbl, "tmp.parquet")
# read in the metadata using pyarrow and extract the pyarrow._parquet.Schema
pa_metadata = pq.read_metadata("tmp.parquet")
pa_schema = pa_metadata.schema
df_dask = dd.read_parquet("s3://some-bucket/year=2020/month=04/day=01/*/*.parquet",
columns=["column1", "column2"],
engine="pyarrow",
gather_statistics=False,
dataset=dict(validate_schema=False, schema=pa_schema)
)
При использовании этого метода, глядя на раздел всего за один день, я вижу значительное ускорение (~ 4x). Как только я просматриваю данные за месяц, мои рабочие перестают работать (я предполагаю, потому что dask пытается прочитать слишком много данных в один конкретный c узел?).
Я не могу изменить структуру моих данных, к сожалению. Прочитав это , я понял, что если бы у меня был файл _metadata
или _common_metadata
, то я бы увидел значительное ускорение. Однако для меня это невозможно.
Итак, почему dask намного медленнее, чем Spark в этом конкретном случае использования? И, в частности, могу ли я что-нибудь сделать, чтобы ускорить чтение паркетных файлов в dask?
Дополнительные сведения * Всего более 1000 столбцов. * Данные сохраняются с использованием Java (мы не можем это изменить) * Версии пакетов - dask==2.15.0
, dask-yarn==0.8.1
, distributed==2.15.2
, pyarrow==0.17.0
* Каждый day=*
обычно занимает около 6,4 ГБ на жестком диске. Размер самого большого раздела datasource=*
составляет около 1,5 ГБ. Размер отдельных паркетных файлов составляет от 13 МБ до 150 МБ * Мы пробовали fastparquet
в качестве альтернативы движку pyarrow
, но это было медленнее, чем pyarrow