read_parquet в dask действительно медленный по сравнению с Spark - PullRequest
1 голос
/ 06 мая 2020

У меня был опыт использования Spark в прошлом, и, честно говоря, исходя из опыта, в основном, python, это был довольно большой скачок. Когда я увидел dask , я подумал, что это будет намного лучшее решение для распределенных вычислений, однако, похоже, я столкнулся с проблемой простого чтения паркетных файлов.

У меня s3 ведро, скажем s3://some-bucket/. И в этом у меня есть мои паркетные файлы, сохраненные как разделение улья, как показано ниже.

├── year=2019
    └── month=01
    └── month=01
    └── ....     
└── year=2020
    └── month=01
        └── day=01
            ├── datasource=browser
                └── ...
            └── datasource=iphone
                └── part2.parquet
                └── part2.parquet
        └── ...

Нет файла _metadata или _common_metadata .

Чтобы продемонстрировать, сколько более медленное чтение это в dask по сравнению с Spark Я могу привести пример. В Spark это занимает ~ 90 секунд:

df = spark.read.parquet("s3://some-bucket/year=2020/month=04/")


df.createOrReplaceTempView("df")

res = spark.sql("SELECT column1, SUM(column2) FROM df GROUP BY column1")

res = res.toPandas()  # convert to pandas to be fair to dask, but we know this isn't fast in spark

То же самое в dask занимает ~ 800 секунд:

cluster = YarnCluster()

# Connect to the cluster
client = Client(cluster)

cluster.scale(10)

df = dd.read_parquet("s3://some-bucket/year=2020/month=04/*/*/*.parquet",
                     columns=["column1", "column2"], 
                     engine="pyarrow",
                     gather_statistics=False,
)

res = df.groupby('column1').column2.sum().compute()

Использование "s3://some-bucket/year=2020/month=04/" вместо "s3://some-bucket/year=2020/month=04/*/*/*.parquet" требует ~ 2100 секунд.

Одна вещь, которую я уже пробовал, - это чтение метаданных только одного из маленьких кусочков паркета, извлечение схемы pyarrow и передача ее как kwarg вместе с validate_schema=False. Примерно так:

import pyarrow.parquet as pq
import s3fs

s3 = s3fs.S3FileSystem()

# we pick a small part of the dataset
ds = pq.ParquetDataset(
    "s3://some-bucket/year=2020/month=04/day=01/datasource=iphone/part1.parquet",
filesystem=s3)

# read in the table and write it out to a temp parquet
tbl = ds.read()
pq.write_table(tbl, "tmp.parquet")

# read in the metadata using pyarrow and extract the pyarrow._parquet.Schema
pa_metadata = pq.read_metadata("tmp.parquet")
pa_schema = pa_metadata.schema

df_dask = dd.read_parquet("s3://some-bucket/year=2020/month=04/day=01/*/*.parquet",
                     columns=["column1", "column2"], 
                     engine="pyarrow",
                     gather_statistics=False,
                     dataset=dict(validate_schema=False, schema=pa_schema)
)

При использовании этого метода, глядя на раздел всего за один день, я вижу значительное ускорение (~ 4x). Как только я просматриваю данные за месяц, мои рабочие перестают работать (я предполагаю, потому что dask пытается прочитать слишком много данных в один конкретный c узел?).

Я не могу изменить структуру моих данных, к сожалению. Прочитав это , я понял, что если бы у меня был файл _metadata или _common_metadata, то я бы увидел значительное ускорение. Однако для меня это невозможно.

Итак, почему dask намного медленнее, чем Spark в этом конкретном случае использования? И, в частности, могу ли я что-нибудь сделать, чтобы ускорить чтение паркетных файлов в dask?

Дополнительные сведения * Всего более 1000 столбцов. * Данные сохраняются с использованием Java (мы не можем это изменить) * Версии пакетов - dask==2.15.0, dask-yarn==0.8.1, distributed==2.15.2, pyarrow==0.17.0 * Каждый day=* обычно занимает около 6,4 ГБ на жестком диске. Размер самого большого раздела datasource=* составляет около 1,5 ГБ. Размер отдельных паркетных файлов составляет от 13 МБ до 150 МБ * Мы пробовали fastparquet в качестве альтернативы движку pyarrow, но это было медленнее, чем pyarrow

...