Разбивать большие кадры данных (панды) на куски (но после группировки) - PullRequest
0 голосов
/ 07 мая 2018

У меня есть большие табличные данные, которые нужно объединить и разбить по группам. Самый простой способ - использовать панд, но единственной проблемой является память.

У меня есть этот код для объединения фреймов данных:

import pandas as pd;
from functools import reduce;

large_df = pd.read_table('large_file.csv', sep=',')

Это, в основном, загрузка всех данных в память th

# Then I could group the pandas dataframe by some column value (say "block" )
df_by_block = large_df.groupby("block")

# and then write the data by blocks as
for block_id, block_val in df_by_block:
    pd.Dataframe.to_csv(df_by_block, "df_" + str(block_id), sep="\t", index=False)

Единственная проблема с вышеприведенным кодом - это выделение памяти, которое зависает на моем рабочем столе. Я пытался перенести этот код в dask, но dask не имеет аккуратной groupby реализации.

Примечание: Я мог бы просто отсортировать файл, затем прочитать данные построчно и разделить их при изменении значения «блока». Но единственная проблема заключается в том, что «large_df.txt» создается в конвейере в восходящем направлении путем объединения нескольких фреймов данных.

Есть предложения?

Спасибо

Обновление: Я попробовал следующий подход, но он все еще кажется слишком загруженным:

# find unique values in the column of interest (which is to be "grouped by")
large_df_contig = large_df['contig']
contig_list = list(large_df_contig.unique().compute())

# groupby the dataframe 
large_df_grouped = large_df.set_index('contig')

# now, split dataframes
for items in contig_list:
    my_df = large_df_grouped.loc[items].compute().reset_index()
    pd.DataFrame.to_csv(my_df, 'dask_output/my_df_' + str(items), sep='\t', index=False)

Все хорошо, но код

my_df = large_df_grouped.loc[items].compute().reset_index() кажется, снова тянет все в память.

Есть ли способ улучшить этот код ??

1 Ответ

0 голосов
/ 07 мая 2018

но в dask нет аккуратной группыb

На самом деле, dask имеет groupby + определенные пользователем функции с перестановкой OOM .

Вы можете использовать

large_df.groupby(something).apply(write_to_disk)

где write_to_disk - некоторая короткая функция, записывающая блок на диск. По умолчанию в этих случаях dask использует перестановку дисков (в отличие от перестановки в сети). Обратите внимание, что эта операция может быть медленной и все равно может завершиться ошибкой, если размер одной группы превышает вашу память.

...