Обновление данных PySpark ETL - PullRequest
0 голосов
/ 06 марта 2020

У меня есть тефреймы данных Pyspark, и я хочу поменять «целевой» фрейм данных на «промежуточный» в соответствии с ключом ... Какой лучший способ оптимизировать это в Pyspark?

target
+---+-----------------------+------+------+
|key|updated_timestamp      |field0|field1|
+---+-----------------------+------+------+
|005|2019-10-26 21:02:30.638|cdao  |coaame|
|001|2019-10-22 13:02:30.638|aaaaaa|fsdc  |
|002|2019-12-22 11:42:30.638|stfi  |?     |
|004|2019-10-21 14:02:30.638|ct    |ome   |
|003|2019-10-24 21:02:30.638|io    |me    |
+---+-----------------------+------+------+

staging
+---+-----------------------+----------+---------+
|key|updated_timestamp      |field0    |field1   |
+---+-----------------------+----------+---------+
|006|2020-03-06 01:42:30.638|new record|xxaaame  |
|005|2019-10-29 09:42:30.638|cwwwwdao  |coaaaaame|
|004|2019-10-29 21:03:35.638|cwwwwdao  |coaaaaame|
+---+-----------------------+----------+---------+

output dataframe

+---+-----------------------+----------+---------+
|key|updated_timestamp      |field0    |field1   |
+---+-----------------------+----------+---------+
|005|2019-10-29 09:42:30.638|cwwwwdao  |coaaaaame|
|001|2019-10-22 13:02:30.638|aaaaaa    |fsdc     |
|002|2019-12-22 11:42:30.638|stfi      |?        |
|004|2019-10-29 21:03:35.638|cwwwwdao  |coaaaaame|
|003|2019-10-24 21:02:30.638|io        |me       |
|006|2020-03-06 01:42:30.638|new record|xxaaame  |
+---+-----------------------+----------+---------+

Ответы [ 2 ]

1 голос
/ 06 марта 2020

Другое решение с использованием union:

output = staging.union(
    target.join(
        staging,
        on="key",
        how="left_anti"
    )
)
1 голос
/ 06 марта 2020

Есть несколько способов достичь этого. Вот пример использования полного внешнего соединения:

from pyspark.sql import functions as F

output = staging.join(
    target,
    on='key',
    how='full'
).select(
    *(
        F.coalesce(staging[col], target[col]).alias(col)
        for col 
        in staging.columns
    )
)

Это работает, только если обновленное значение не NULL.

...