Написание огромных dask-фреймов данных для паркета не хватает памяти - PullRequest
0 голосов
/ 04 марта 2019

Я в основном конвертирую некоторые csv файлы в parquet.Для этого я решил использовать dask, прочитать csv на dask и записать обратно в parquet.Я использую большой размер блока по запросу клиента (500 МБ).Размеры csv составляют 15 ГБ и более (до 50 ГБ), машина имеет 64 ГБ ОЗУ.Всякий раз, когда я запускаю базовую команду to_parquet, объем ОЗУ начинает увеличиваться и в итоге становится настолько высоким, что linux убивает процесс.Кто-нибудь знает, почему это происходит?Когда я не задаю размеры блоков , он работает, но создает множество небольших файлов паркета (24 МБ).Есть ли способ решить это создание блоков не менее 500 МБ.

_path = 'E://'

dt = dd.read_csv(_path+'//temporal.csv', blocksize = 500e5)

dt.to_parquet(path=_path+'/t.parq', compression='gzip')`

Ответы [ 2 ]

0 голосов
/ 06 марта 2019

Вы говорите, что ваши разделы имеют размер около 15 ГБ каждый.Если вы используете Dask со многими потоками, то в памяти их может быть несколько.Если у вас есть только 64 ГБ, то вполне возможно, что не хватает оперативной памяти.

Некоторые параметры:

  1. Используйте меньший размер чанка
  2. Кодируйте данные по-разному, чтобы они занимали меньше места.Например, текстовые данные (частая причина разрушения памяти в Python) могут быть более эффективно сохранены в виде категорий
  3. Использовать меньше потоков
0 голосов
/ 04 марта 2019

Пожалуйста, давайте примем это как подробный комментарий.Почему бы сначала не разделить файлы в 500 МБ CSV, а затем преобразовать в паркет с помощью dask?

import pandas as pd
import numpy as np
import os

fldr = "data/splitted"
fldr_out = "data/splitted_parquet"
os.makedirs(fldr)
os.makedirs(fldr_out)

# this for a ~4gb csv
rows = int(1e7)
cols = 20
df = pd.DataFrame(np.random.rand(rows, cols),
                  columns=["col_{}".format(i) for i in range(cols)])


df.to_csv("data/file.csv")

В Linux вы можете разбить его на файлы по 500 М с помощью

split -b 500M --additional-suffix=".csv" file.csv splitted/file_part_

ТеперьВы можете преобразовать в паркет с DASK

from dask import compute()
from dask import delayed

@delayed
def csv2parq(fn):
    out = fn.replace(fldr, fldr_out)
    pd.read_csv(fn).to_parquet(out)

fns = os.listdir(fldr)
fns = [os.path.join(fldr, fn) for fn in fns]
compute([csv2parq(fn) for fn in fns])
...