Чтение большого количества файлов паркета: read_parquet vs from_delayed - PullRequest
1 голос
/ 27 января 2020

Я читаю большее количество (от 100 до 1000 с) файлов паркета в одном фрейме данных dask (один компьютер, все локальное). Я понял, что

files = ['file1.parq', 'file2.parq', ...]
ddf = dd.read_parquet(files, engine='fastparquet')
ddf.groupby(['col_A', 'col_B']).value.sum().compute()

намного менее эффективно, чем

from dask import delayed
from fastparquet import ParquetFile

@delayed
def load_chunk(pth):
    return ParquetFile(pth).to_pandas()

ddf = dd.from_delayed([load_chunk(f) for f in files])
ddf.groupby(['col_A', 'col_B']).value.sum().compute()

Для моего конкретного применения второй подход (from_delayed) занимает 6 секунд, чтобы завершено, первый подход занимает 39 секунд. В случае dd.read_parquet, кажется, много работы еще до того, как рабочие начинают что-то делать, и довольно много операций transfer-... разбросано по всему графику потока задач. Я хотел бы понять, что здесь происходит. Что может быть причиной того, что read_parquet подход намного медленнее? Чем это отличается от простого чтения файлов и помещения их в куски?

1 Ответ

3 голосов
/ 27 января 2020

Вы наблюдаете, как клиент пытается установить sh минимальную / максимальную статистику столбцов данных и, таким образом, устанавливает sh хороший индекс для фрейма данных. Индекс может быть очень полезен для предотвращения чтения из файлов данных, которые не нужны для вашей конкретной работы.

Во многих случаях это хорошая идея, когда объем данных в файле велик, а общий количество файлов мало. В других случаях та же информация может содержаться в специальном файле «_metadata», так что не нужно будет сначала читать все файлы.

Чтобы предотвратить проверку нижних колонтитулов файлов, вы должен вызвать

dd.read_parquet(..,. gather_statistics=False)

Это должно быть по умолчанию в следующей версии dask.

...