Чрезмерное использование памяти при использовании кадра данных dask, созданного из файла паркета - PullRequest
0 голосов
/ 25 декабря 2018

У меня есть файл паркета, который имеет 800K строк x 8,7K столбцов.Я загрузил его в файл данных dask:

import dask.dataframe as dd
dask_train_df = dd.read_parquet('train.parquet')
dask_train_df.info()

Это дает:

<class 'dask.dataframe.core.DataFrame'>
Columns: 8712 entries, 0 to 8711
dtypes: int8(8712)

Когда я пытаюсь выполнить простые операции, такие как dask_train_df.head() или dask_train_df.loc[2:4].compute(), я получаю ошибки памяти,даже с 17+ ГБ ОЗУ.

Однако, если я сделаю:

import pandas as pd
train = pd.read_parquet('../input/train.parquet')
train.info()

, вы получите:

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 800000 entries, 0 to 799999
Columns: 8712 entries, 0 to 8711
dtypes: int8(8712)
memory usage: 6.5 GB

, и я смогу запустить train.head() и train.loc[2:4] без проблем, так как все уже находится в памяти.

1) Итак, мой вопрос: почему эти простые операции увеличивают использование памяти с помощью Dask Dataframe, но прекрасно работают, когда я загружаю все в память, используяPandas Dataframe?

Я заметил, что npartitions=1, и я вижу, что в документации read_parquet "считывает каталог данных Parquet в Dask.dataframe, один файл на раздел".В моем случае это звучит так, как будто я теряю всю мощь распараллеливания, связанную с наличием нескольких разделов, но разве использование памяти Dask Dataframe не должно ограничиваться объемом памяти одного Pandas Dataframe?

2) Кроме того, дополнительный вопрос: если бы я хотел распараллелить этот единственный файл паркета, разбив его на Dask Dataframe, как бы я это сделал?Я не вижу параметра размера блока в сигнатуре dd.read_parquet.Я также пытался использовать функцию перераспределения, но я считаю, что разделы вдоль строк и в файле паркета, я хотел бы разделить вдоль столбцов?

1 Ответ

0 голосов
/ 29 декабря 2018

Во-первых, я хотел бы прокомментировать, что 8712 столбцов довольно много, и вы обнаружите, что анализ схемы / метаданных может занять значительное время, не говоря уже о загрузке данных.

Когда fastparquet загружает данные, онсначала выделяет фрейм данных достаточного размера, затем выполняет итерацию по столбцам / фрагментам (с соответствующими издержками, которые в данном случае, очевидно, являются небольшими) и назначает значения в выделенный фрейм данных.

Когда вы выполняете вычисления через Dask (любой расчет), во многих случаях могут быть внутризадачные копии в памяти входных переменных и других промежуточных объектов.Обычно это не проблема, поскольку весь набор данных должен быть разбит на множество частей, а накладные расходы памяти небольших промежуточных узлов - это цена, которую стоит заплатить за способность обрабатывать наборы данных, большие, чем память.Я не уверен, в какой момент вы получаете копию, возможно, стоит исследовать и предотвратить.

В вашем случае весь набор данных представляет собой один раздел.Это приведет к одной задаче загрузки, запущенной в одном потоке.Вы не будете получать никакого параллелизма, и любые промежуточные внутренние копии применяются ко всему набору данных.Вы могли бы загружать только часть данных, выбирая столбцы, и, таким образом, изготавливать разделы и таким образом достигать параллелизма.Однако типичный способ обработки данных паркета состоит в использовании разделов «группы строк» ​​(т. Е. Вдоль индекса) и нескольких файлов, поэтому реальным способом избежать проблемы является использование данных, которые уже соответствующим образом разделены.

Обратите внимание: поскольку вы можете загружать данные напрямую с помощью fastparquet / pandas, вы, вероятно, также можете сохранить секционированную версию либо с помощью метода to_parquet, либо с помощью функции write fastparquet.

...