Как обновить фрейм данных pyspark новыми значениями из другого фрейма данных? - PullRequest
0 голосов
/ 11 мая 2018

У меня есть два искровых фрейма данных:

Фрейм данных A:

|col_1 | col_2 | ... | col_n |
|val_1 | val_2 | ... | val_n |

и фрейм данных B:

|col_1 | col_2 | ... | col_m |
|val_1 | val_2 | ... | val_m |

Фрейм данных B может содержать повторяющиеся, обновленные и новые строкииз фрейма данных A. Я хочу написать в spark операцию, в которой я могу создать новый фрейм данных, содержащий строки из фрейма данных A и обновленные и новые строки из фрейма данных B.

Я начал с создания хеш-столбца, содержащего толькостолбцы, которые не могут быть обновлены.Это уникальный идентификатор.Допустим, col1 и col2 могут изменить значение (может быть обновлено), но col3,..,coln уникальны.Я создал хеш-функцию как hash(col3,..,coln):

A=A.withColumn("hash", hash(*[col(colname) for colname in unique_cols_A]))
B=B.withColumn("hash", hash(*[col(colname) for colname in unique_cols_B]))

Теперь я хочу написать некоторый искровой код, который в основном выбирает строки из B, которые имеют хэш не в A (поэтому новые строки иобновленные строки) и объедините их в новый фрейм данных вместе со строками из A. Как этого добиться в pyspark?

Редактировать: Фрейм данных B может иметь дополнительные столбцы из фрейма данных A, поэтому объединениеневозможно.

Пример примера

Кадр данных A:

+-----+-----+
|col_1|col_2|
+-----+-----+
|    a|  www|
|    b|  eee|
|    c|  rrr|
+-----+-----+

Кадр данных B:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    d|  yyy|    2|
|    c|  rer|    3|
+-----+-----+-----+

Результат: Кадр данных C:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    b|  eee| null|
|    c|  rer|    3|
|    d|  yyy|    2|
+-----+-----+-----+

Ответы [ 3 ]

0 голосов
/ 11 мая 2018

Это тесно связано с обновлением столбца данных с новыми значениями , за исключением того, что вы также хотите добавить строки из DataFrame B. Один из подходов состоит в том, чтобы сначала сделать то, что указано в связанном вопросе, и затем объедините результат с DataFrame B и удалите дубликаты.

Например:

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        'col_1',
        f.when(
            ~f.isnull(f.col('b.col_2')),
            f.col('b.col_2')
        ).otherwise(f.col('a.col_2')).alias('col_2'),
        'b.col_3'
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()
#+-----+-----+-----+
#|col_1|col_2|col_3|
#+-----+-----+-----+
#|    a|  wew|    1|
#|    b|  eee| null|
#|    c|  rer|    3|
#|    d|  yyy|    2|
#+-----+-----+-----+

Или более обобщенно, используя понимание списка, если у вас есть много столбцов для замены, и вы не хотите жестко кодировать их все:

cols_to_update = ['col_2']

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        *[
            ['col_1'] + 
            [
                f.when(
                    ~f.isnull(f.col('b.{}'.format(c))),
                    f.col('b.{}'.format(c))
                ).otherwise(f.col('a.{}'.format(c))).alias(c)
                for c in cols_to_update
            ] + 
            ['b.col_3']
        ]
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()
0 голосов
/ 21 января 2019

Я бы выбрал другое решение, которое, на мой взгляд, является менее многословным, более общим и не включает в себя перечисление столбцов.Сначала я бы определил подмножество dfA, которое будет обновлено (replaceDf), выполнив внутреннее объединение на основе keyCols (список).Затем я бы вычел этот replaceDF из dfA и объединил его с dfB.

    replaceDf = dfA.alias('a').join(dfB.alias('b'), on=keyCols, how='inner').select('a.*')
    resultDf = dfA.subtract(replaceDf).union(dfB).show()

Даже если в dfA и dfB будут разные столбцы, вы все равно можете преодолеть это, получив список столбцов из обоих DataFrames и найдяих союз.Затем я подготовил бы запрос на выбор (вместо « select. ('A. ') *»), чтобы я просто перечислял столбцы из dfA, которые существуют в dfB + «null as colname» для тех, кто делаетне существует в dfB.

0 голосов
/ 11 мая 2018

Если вы хотите сохранить только уникальные значения и требовать строго правильных результатов, то union, за которым следует dropDupilcates, должны сделать трюк:

columns_which_dont_change = [...]
old_df.union(new_df).dropDuplicates(subset=columns_which_dont_change)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...