Dask не восстанавливает разделы из простых (не Hive) паркетных файлов - PullRequest
0 голосов
/ 07 апреля 2020

У меня есть вопрос из двух частей о Dask + Parquet. Я пытаюсь выполнить запросы к кадру данных dask, созданному из многораздельного файла Parquet, следующим образом:

import pandas as pd
import dask.dataframe as dd
import fastparquet

##### Generate random data to Simulate Process creating a Parquet file ######

test_df = pd.DataFrame(data=np.random.randn(10000, 2), columns=['data1', 'data2'])
test_df['time'] = pd.bdate_range('1/1/2000', periods=test_df.shape[0], freq='1S')

# some grouping column
test_df['name'] = np.random.choice(['jim', 'bob', 'jamie'], test_df.shape[0])


##### Write to partitioned parquet file, hive and simple #####

fastparquet.write('test_simple.parquet', data=test_df, partition_on=['name'], file_scheme='simple')
fastparquet.write('test_hive.parquet',   data=test_df, partition_on=['name'], file_scheme='hive')

# now check partition sizes. Only Hive version works.
assert test_df.name.nunique() == dd.read_parquet('test_hive.parquet').npartitions  # works.
assert test_df.name.nunique() == dd.read_parquet('test_simple.parquet').npartitions # !!!!FAILS!!!

Моя цель здесь - иметь возможность быстро фильтровать и обрабатывать отдельные разделы параллельно, используя dask, что-то вроде этого :

df = dd.read_parquet('test_hive.parquet')
df.map_partitions(<something>)   # operate on each partition

Я в порядке с использованием каталога паркет в стиле Hive, но я заметил, что работа над ним занимает значительно больше времени по сравнению с прямым чтением из одного файла паркета.

Может кто-нибудь сказать мне идиоматический c способ достижения этого? Все еще довольно плохо знаком с Dask / Parquet, поэтому извиняюсь, если это запутанный подход.

1 Ответ

1 голос
/ 07 апреля 2020

Возможно, это не было ясно из строки документации, но разделение по значению просто не происходит для «простого» типа файла, поэтому он имеет только один раздел.

Что касается скорости, чтение данные в одном вызове функции выполняются быстрее всего, когда данные настолько малы, особенно если вы намереваетесь выполнить какую-либо операцию, например nunique, которая потребует комбинации значений из разных разделов.

В Dask каждая задача сопряжена с дополнительными издержками, поэтому, если объем работы, выполняемой вызовом, не будет большим по сравнению с этими издержками, вы можете проиграть. Кроме того, доступ к диску обычно не распараллелен, и некоторые части вычислений могут не работать параллельно в потоках, если они содержат GIL. Наконец, , секционированная версия содержит больше метаданных паркета, которые нужно проанализировать.

>>> len(dd.read_parquet('test_hive.parquet').name.nunique())
12
>>> len(dd.read_parquet('test_simple.parquet').name.nunique())
6

TL; DR: убедитесь, что ваши разделы достаточно велики, чтобы занять работу.

(примечание: набор уникальных значений уже очевиден из метаданных паркета, загрузка данных вообще не требуется; но Dask не знает, как выполнить эту оптимизацию, поскольку, в конце концов, некоторые из разделов могут содержит ноль строк)

...