Я читаю большее количество (от 100 до 1000 с) файлов паркета в одном фрейме данных dask (один компьютер, все локальное). Я понял, что
files = ['file1.parq', 'file2.parq', ...]
ddf = dd.read_parquet(files, engine='fastparquet')
ddf.groupby(['col_A', 'col_B']).value.sum().compute()
намного менее эффективно, чем
from dask import delayed
from fastparquet import ParquetFile
@delayed
def load_chunk(pth):
return ParquetFile(pth).to_pandas()
ddf = dd.from_delayed([load_chunk(f) for f in files])
ddf.groupby(['col_A', 'col_B']).value.sum().compute()
Для моего конкретного применения второй подход (from_delayed
) занимает 6 секунд, чтобы завершено, первый подход занимает 39 секунд. В случае dd.read_parquet
, кажется, много работы еще до того, как рабочие начинают что-то делать, и довольно много операций transfer-...
разбросано по всему графику потока задач. Я хотел бы понять, что здесь происходит. Что может быть причиной того, что read_parquet
подход намного медленнее? Чем это отличается от простого чтения файлов и помещения их в куски?