Pyspark - сравнивает два фрейма данных, удаляя точно совпадающие строки, объединяя строки с различиями, затем обнуляя совпадающие значения - PullRequest
0 голосов
/ 05 февраля 2019

Так что я знаю, как сравнивать два фрейма данных и удалять строки, которые совпадают, используя вычитание.Это нормально.

И я знаю, как объединить значения, которые НЕ соответствуют, при создании нового df с результатами из обеих таблиц, которые не совпадают.

Что я не могу понять, какзатем сделать это - пустые значения, которые совпадают (и оставить идентификатор col только) распределенным способом, используя pyspark

Пример:

df_as_list = [['id','name','monthly_sales'],
              [101,'John Snow', 1234.56],
              [102,'Daenerys Targaryen', 9294.96],
              [103,'Saul Goodman', 1274.57],
              [104,'Bobby Axelrob', 1123459.56],
              [105,'Joe Miller', 34.56],
              [106,'James Holden', 1.23]]
my_schema = df_as_list.pop(0)
df1 = spark.createDataFrame(df_as_list, my_schema)

df_as_list = [['id','name','monthly_sales'],
              [101,'John Snow', 777.56],
              [102,'Daenerys Targaryen', 9294.96],
              [103,'Saul Goodman', 1274.57],
              [104,'Bobby Axelrob', 1123459.56],
              [105,'Joe Miller', 34.56],
              [1106,'James Holden', 1.23]]
my_schema = df_as_list.pop(0)
df2 = spark.createDataFrame(df_as_list, my_schema)

df1.show()
df2.show()

Желаемый результат:

+---+------------------+-------------+
| id|              name|monthly_sales|
+---+------------------+-------------+
|101|                  |      1234.56|
|101|                  |       777.56|
+---+------------------+-------------+

1 Ответ

0 голосов
/ 05 февраля 2019

Один из способов - сначала найти id s, где есть различия, и выяснить, какие столбцы равны:

from functools import reduce

diffs = df1.join(df2, on="id")\
    .where(reduce(lambda a, b: a|b, [df1[c] != df2[c] for c in df1.columns]))\
    .select("id", *[(df1[c] == df2[c]).alias(c) for c in df1.columns if c != "id"])
diffs.show()
#+---+----+-------------+
#| id|name|monthly_sales|
#+---+----+-------------+
#|101|true|        false|
#+---+----+-------------+

Условие reduce(lambda a, b: a|b, [df1[c] != df2[c] for c in df1.columns]) будет содержать только строки, в которых хотя бы один столбецразличается между двумя фреймами данных.

Теперь используйте diffs, чтобы присоединиться к объединению двух фреймов данных, и используйте логические значения, чтобы определить, хотите ли вы отобразить столбец, или null, если они совпадают.

from pyspark.sql.functions import when, col, lit

df1.union(df2).alias("u")\
    .join(diffs.alias("d"), on="id")\
    .select(
        "id", 
        *[
            when(
                col("d."+c), 
                lit(None)
            ).otherwise(col("u."+c)).alias(c) 
            for c in diffs.columns 
            if c != "id"
        ]
    )\
    .show()
#+---+----+-------------+
#| id|name|monthly_sales|
#+---+----+-------------+
#|101|null|       777.56|
#|101|null|      1234.56|
#+---+----+-------------+

Вы должны будете поместить null в соответствующие столбцы (в отличие от пустой строки), потому что типы для столбца должны быть согласованными (если вы не приводите все к строке).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...