Многократное слияние в Dask и имена полей - PullRequest
0 голосов
/ 15 февраля 2019

Я пытаюсь объединить несколько фреймов данных панд в большой фрейм данных Dask с полями ["a_id", "b_id", "c_id"].Каждый кадр данных панд "A", "B" и "C" имеет уникальное поле ("a_id", "b_id" и "c_id"), которое присоединяет его к кадру данных Dask.«B» и «C» также имеют поле «b_Field1»:

import pandas as pd
import dask.dataframe as dd

A = pd.DataFrame({'a_id': [1, 2, 3], 'a_Field1': [0, 0, 0]})
B = pd.DataFrame({'b_id': [3, 4, 5], 'b_Field1': [7, 8, 9]})
C = pd.DataFrame({'c_id': [4, 5, 6], 'b_Field1': [6, 7, 8], 'c_Field1': [10, 11, 12]})

pdf = pd.DataFrame({'a_id': [1, 2], 'b_id': [3, 4], 'c_id': [5, 6]})
pdf = pdf.merge(A, how="left", on="a_id")
pdf = pdf.merge(B, how="left", on="b_id")
pdf = pdf.merge(C, how="left", on=["c_id", "b_Field1"])

print(pdf)

"""
Result:
   a_id  b_id  c_id  a_Field1  b_Field1  c_Field1
0     1     3     5         0         7        11
1     2     4     6         0         8        12
"""

dA = dd.from_pandas(A, npartitions=1)
dB = dd.from_pandas(B, npartitions=1)
dC = dd.from_pandas(C, npartitions=1)
ddf = dd.from_pandas(pdf, npartitions=1)

ddf = ddf.merge(dA, how="left", on="a_id")
ddf = ddf.merge(dB, how="left", on="b_id")
ddf = ddf.merge(dC, how="left", on=["c_id", "b_Field1"])

Это не помогает, говоря, что в ddf нет поля «b_Field1».Я думал, что мне нужно выполнить команду .compute () между слияниями B и C, но это заставляет Dask бесконечно зависать на 40% на индикаторе выполнения (в конечном итоге умирает с MemoryError).

Является ливычислить необходимо перед вторым соединением?И если да, то по какой причине он зависнет?Эти наборы данных едва достаточно малы для объединения в чистые панды, и объединение происходит быстро, но я пытаюсь сделать это развертываемым на машинах с меньшим объемом оперативной памяти.

1 Ответ

0 голосов
/ 19 февраля 2019

Если вы посмотрите на фрейм данных перед последней строкой, вы обнаружите, что в нем есть столбцы:

a_id  b_id  c_id  a_Field1_x  b_Field1_x  c_Field1  a_Field1_y  b_Field1_y

, т. Е. b_Field1 разделился на две части, и действительно получается, что эти двеидентичны.Это, вероятно, ошибка в Dask, поскольку, как вы показываете, в Pandas такого не происходит.Тем не менее, установка соответствующего индекса или настройка необязательных аргументов на merge может быть обходным путем.

С таким фреймом данных вы можете сделать

ddf = ddf.merge(dC.rename(columns={'b_Field1': 'b_Field1_x'}), 
     how="left", on=["c_id", "b_Field1_x"])

, где теперь вы также дублируетесь c_ столбцы.

Что касается общей проблемы с памятью, это подробно обсуждается в другом месте.Обязательно тщательно выбирайте размеры разделов, индекс и количество работников.

...