Dask хватает памяти даже с кусками - PullRequest
0 голосов
/ 22 января 2020

Я работаю с большими CSV-файлами и Мне нужно сделать декартово произведение (операция слияния) . Я пытался столкнуться с проблемой с Pandas (вы можете проверить код Panda и пример формата данных на ту же проблему , здесь ) без успеха из-за ошибок памяти. Теперь я пытаюсь использовать Dask, который должен управлять огромными наборами данных, даже если его размер больше доступной оперативной памяти.

Прежде всего, я читаю оба CSV:

from dask import dataframe as dd

BLOCKSIZE = 64000000  # = 64 Mb chunks


df1_file_path = './mRNA_TCGA_breast.csv'
df2_file_path = './miRNA_TCGA_breast.csv'

# Gets Dataframes
df1 = dd.read_csv(
    df1_file_path,
    delimiter='\t',
    blocksize=BLOCKSIZE
)
first_column = df1.columns.values[0]
df1.set_index(first_column)
df2 = dd.read_csv(
    df2_file_path,
    delimiter='\t',
    blocksize=BLOCKSIZE
)
first_column = df2.columns.values[0]
df2.set_index(first_column)

# Filter common columns
common_columns = df1.columns.intersection(df2.columns)
df1 = df1[common_columns]
df2 = df2[common_columns]

Затем я выполняю операцию сохранения на диске, чтобы предотвратить ошибки памяти:

# Computes a Cartesian product
df1['_tmpkey'] = 1
df2['_tmpkey'] = 1

# Neither of these two options work
# df1.merge(df2, on='_tmpkey').drop('_tmpkey', axis=1).to_hdf('/tmp/merge.*.hdf', key='/merge_data')
# df1.merge(df2, on='_tmpkey').drop('_tmpkey', axis=1).to_parquet('/tmp/')

Я сделал репо, чтобы попытаться использовать точно такие же файлы CSV, которые я использую . Я пытался с меньшими значениями blocksize, но я получил ту же ошибку. Я что-то пропустил? Любая помощь будет очень признательна.

1 Ответ

1 голос
/ 25 января 2020

Я успешно выполнил ваш код, используя следующий метод с объемом памяти, ограниченным 32 ГБ.

Я избавился от аргумента BLOCKSIZE и использовал repartition вместо этого на df1 и df2.

df1 = df1.repartition(npartitions=50)
df2 = df2.repartition(npartitions=1)

Обратите внимание, что размер df2 действительно меньше по сравнению с df1 ( 2,5 МБ против 23,75 МБ ), поэтому я сохранил только один раздел для df2 и Разрежьте df1 на 50 разделов.

И это должно заставить код работать на вас. Для меня используемая память осталась ниже 12 ГБ.

Чтобы проверить, я вычислил длину результата:

len(df) # 3001995

Следуя вышесказанному, создаем файл паркета с 50 разделами. Вы можете снова использовать repartition, чтобы получить нужный размер раздела.

NB:

Добавление этого должно ускорить ваш код:

from dask.distributed import Client
client = Client()

В моем случае мне пришлось использовать аргумент Client(processes=False) из-за моей рабочей среды.

...