Объединение Dask вызывает ошибку памяти, когда объединение Pandas не для тех же файлов - PullRequest
2 голосов
/ 29 мая 2019

Я пытаюсь объединить несколько кадров данных dask, но это приводит к израсходованию всей моей оперативной памяти и к аварийному завершению работы моей среды (Google Colab).

Я попытался объединить с Dask, потому что слышал, как Dask разбивает файлы на части, чтобы они легче загружались в память.Тем не менее, Панд способен справиться со своей операцией, а Даска - нет.

Причина, по которой я использую Dask, заключается в том, что когда я пытаюсь сохранить свой фрейм данных Pandas, моя среда падает.Поэтому я пытаюсь понять, сможет ли Dask сохранить мои данные без сбоев, но я застрял при создании своего фрейма данных.

combA = np.load(file2A.format(0) , allow_pickle=True)
combB = np.load(file2B.format(0), allow_pickle=True )
combC = np.load(file2C.format(0), allow_pickle=True )
combD = np.load(file2D.format(0) , allow_pickle=True)
combE = np.load(file2E.format(0) , allow_pickle=True )
combF = np.load(file2F.format(0), allow_pickle=True )

dfAllA = dd.from_pandas(pd.DataFrame(combA), npartitions=10)
dfAllB = dd.from_pandas(pd.DataFrame(combB), npartitions=10)
dfAllC = dd.from_pandas(pd.DataFrame(combC), npartitions=10)
dfAllD = dd.from_pandas(pd.DataFrame(combD), npartitions=10)
dfAllE = dd.from_pandas(pd.DataFrame(combE), npartitions=10)
dfAllF = dd.from_pandas(pd.DataFrame(combF), npartitions=10)

dfAllT = dd.concat([dfAllA, dfAllB, dfAllC, dfAllD, dfAllE, dfAllF ], interleave_partitions=True)

Я хотел бы выполнить объединение без ошибки памяти.

Из приведенного ниже ответа похоже, что я должен определить функцию, которая выполняет загрузку и объединение даты,введите это в функции dask.delayed и выполните .compute () для этих функций

Однако, что-то вроде


def daskFunc1():

    combA = np.load(file2A.format(0) , allow_pickle=True)
    combB = np.load(file2B.format(0), allow_pickle=True )
    combC = np.load(file2C.format(0), allow_pickle=True )
    combD = np.load(file2D.format(0) , allow_pickle=True)
    combE = np.load(file2E.format(0) , allow_pickle=True )
    combF = np.load(file2F.format(0), allow_pickle=True )

    dfAllA = dd.from_pandas(pd.DataFrame(combA), npartitions=10)
    dfAllB = dd.from_pandas(pd.DataFrame(combB), npartitions=10)
    dfAllC = dd.from_pandas(pd.DataFrame(combC), npartitions=10)
    dfAllD = dd.from_pandas(pd.DataFrame(combD), npartitions=10)
    dfAllE = dd.from_pandas(pd.DataFrame(combE), npartitions=10)
    dfAllF = dd.from_pandas(pd.DataFrame(combF), npartitions=10)

def daskFunc2():
    dfAllT = dd.concat([dfAllA, dfAllB, dfAllC, dfAllD, dfAllE, dfAllF ], interleave_partitions=True)

from dask.delayed import delayed

f1 = delayed(daskFunc1)
f2 = delayed(daskFunc2)

f1.compute()
f2.compute()

, когда я тогда попробовал

dfAllT.head()

Я получаю

NameError: name 'dfAllT' is not defined

1 Ответ

1 голос
/ 02 июня 2019

В настоящее время вы загружаете все свои данные в ОЗУ, а затем передаете их в Dask. Dask не может вам сильно помочь, если все ваши данные уже заполняют ОЗУ при первом запуске.

Вместо этого гораздо приятнее сказать Dask, как загружать ваши данные, и позволить ему выполнять загрузку в нужное время. Этот документ может указать вам правильное направление: https://docs.dask.org/en/latest/delayed-collections.html, а вот старый пример https://gist.github.com/mrocklin/e7b7b3a65f2835cda813096332ec73ca

...