Сравните два кадра данных Pyspark - PullRequest
1 голос
/ 18 февраля 2020

Я пытаюсь сравнить два фрейма данных с одинаковым количеством столбцов, т.е. 4 столбца с идентификатором ключа в качестве ключевого столбца в обоих фреймах данных.

df1 = spark.read.csv("/path/to/data1.csv")
df2 = spark.read.csv("/path/to/data2.csv")

Теперь я хочу добавить новый столбец в DF2, то есть column_names это список столбцов с значениями, отличными от df1

df2.withColumn("column_names",udf())

DF1

+------+---------+--------+------+
|   id | |name  | sal  | Address |
+------+---------+--------+------+
|     1|  ABC   | 5000 | US      |
|     2|  DEF   | 4000 | UK      |
|     3|  GHI   | 3000 | JPN     |
|     4|  JKL   | 4500 | CHN     |
+------+---------+--------+------+

DF2:

+------+---------+--------+------+
|   id | |name  | sal  | Address |
+------+---------+--------+------+
|     1|  ABC   | 5000 | US      |
|     2|  DEF   | 4000 | CAN     |
|     3|  GHI   | 3500 | JPN     |
|     4|  JKL_M | 4800 | CHN     |
+------+---------+--------+------+

Теперь я хочу DF3

DF3:

+------+---------+--------+------+--------------+
|   id | |name  | sal  | Address | column_names |
+------+---------+--------+------+--------------+
|     1|  ABC   | 5000 | US      |  []          |
|     2|  DEF   | 4000 | CAN     |  [address]   |
|     3|  GHI   | 3500 | JPN     |  [sal]       |
|     4|  JKL_M | 4800 | CHN     |  [name,sal]  |
+------+---------+--------+------+--------------+

Я видел этот вопрос SO, Как сравнить два столбца данных и вывести столбцы, которые отличается в scala. Попробовал, но результат будет другим.

Я думаю о том, чтобы перейти с функцией UDF, передав строку из каждого кадра данных в udf, сравнив столбец за столбцом и возвращая список столбцов. Однако для этого оба фрейма данных должны быть в отсортированном порядке, чтобы одинаковые строки идентификаторов были отправлены в udf. Сортировка здесь дорогостоящая операция. Любое решение?

Ответы [ 3 ]

3 голосов
/ 18 февраля 2020

Предполагая, что мы можем использовать id для объединения этих двух наборов данных, я не думаю, что есть необходимость в UDF. Эту проблему можно решить, просто используя функции внутреннего объединения, функции array и array_remove .

Сначала давайте создадим два набора данных:

df1 = spark.createDataFrame([
  [1, "ABC", 5000, "US"],
  [2, "DEF", 4000, "UK"],
  [3, "GHI", 3000, "JPN"],
  [4, "JKL", 4500, "CHN"]
], ["id", "name", "sal", "Address"])

df2 = spark.createDataFrame([
  [1, "ABC", 5000, "US"],
  [2, "DEF", 4000, "CAN"],
  [3, "GHI", 3500, "JPN"],
  [4, "JKL_M", 4800, "CHN"]
], ["id", "name", "sal", "Address"])

Сначала мы выполняем внутреннее соединение между двумя наборами данных, затем генерируем условие df1[col] != df2[col] для каждого столбца, кроме id. Когда столбцы не равны, мы возвращаем имя столбца, в противном случае - пустую строку. Список условий будет состоять из элементов массива, из которого мы окончательно удаляем пустые элементы:

from pyspark.sql.functions import col, array, when, array_remove

# get conditions for all columns except id
conditions_ = [when(df1[c]!=df2[c], c).otherwise("") for c in df1.columns if c != 'id']

select_expr =[
                col("id"), 
                *[df2[c] for c in df2.columns if c != 'id'], 
                array_remove(array(*conditions_), "").alias("column_names")
]

df1.join(df2, "id").select(*select_expr).show()

# +---+-----+----+-------+------------+
# | id| name| sal|Address|column_names|
# +---+-----+----+-------+------------+
# |  1|  ABC|5000|     US|          []|
# |  3|  GHI|3500|    JPN|       [sal]|
# |  2|  DEF|4000|    CAN|   [Address]|
# |  4|JKL_M|4800|    CHN| [name, sal]|
# +---+-----+----+-------+------------+
3 голосов
/ 18 февраля 2020

Python: PySpark-версия моего предыдущего scala кода.

import pyspark.sql.functions as f

df1 = spark.read.option("header", "true").csv("test1.csv")
df2 = spark.read.option("header", "true").csv("test2.csv")

columns = df1.columns
df3 = df1.alias("d1").join(df2.alias("d2"), f.col("d1.id") == f.col("d2.id"), "left")

for name in columns:
    df3 = df3.withColumn(name + "_temp", f.when(f.col("d1." + name) != f.col("d2." + name), f.lit(name)))


df3.withColumn("column_names", f.concat_ws(",", *map(lambda name: f.col(name + "_temp"), columns))).select("d1.*", "column_names").show()

Scala: Вот мой лучший подход для вашей проблемы.

val df1 = spark.read.option("header", "true").csv("test1.csv")
val df2 = spark.read.option("header", "true").csv("test2.csv")

val columns = df1.columns
val df3 = df1.alias("d1").join(df2.alias("d2"), col("d1.id") === col("d2.id"), "left")

columns.foldLeft(df3) {(df, name) => df.withColumn(name + "_temp", when(col("d1." + name) =!= col("d2." + name), lit(name)))}
  .withColumn("column_names", concat_ws(",", columns.map(name => col(name + "_temp")): _*))
  .show(false)

Сначала я объединяю два кадра данных в df3 и использую столбцы из df1. Сгибая влево до df3 с временными столбцами, которые имеют значение для имени столбца, когда df1 и df2 имеют одинаковые id и другие значения столбца.

После этого concat_ws для эти имена столбцов и нулевые значения исчезли, и остались только имена столбцов.

+---+----+----+-------+------------+
|id |name|sal |Address|column_names|
+---+----+----+-------+------------+
|1  |ABC |5000|US     |            |
|2  |DEF |4000|UK     |Address     |
|3  |GHI |3000|JPN    |sal         |
|4  |JKL |4500|CHN    |name,sal    |
+---+----+----+-------+------------+

Единственное, что отличается от ожидаемого результата, это то, что вывод - это не список, а строка.

ps Я забыл использовать PySpark, но это нормальная искра, извините.

2 голосов
/ 18 февраля 2020

Вот ваше решение с UDF, я изменил имя dataframe динамически, чтобы оно не было неоднозначным во время проверки. Go с помощью приведенного ниже кода и сообщите мне, если возникнут какие-либо вопросы.

>>> from pyspark.sql.functions import *
>>> df.show()
+---+----+----+-------+
| id|name| sal|Address|
+---+----+----+-------+
|  1| ABC|5000|     US|
|  2| DEF|4000|     UK|
|  3| GHI|3000|    JPN|
|  4| JKL|4500|    CHN|
+---+----+----+-------+

>>> df1.show()
+---+----+----+-------+
| id|name| sal|Address|
+---+----+----+-------+
|  1| ABC|5000|     US|
|  2| DEF|4000|    CAN|
|  3| GHI|3500|    JPN|
|  4|JKLM|4800|    CHN|
+---+----+----+-------+

>>> df2 = df.select([col(c).alias("x_"+c) for c in df.columns])
>>> df3 = df1.join(df2, col("id") == col("x_id"), "left")

 //udf declaration 

>>> def CheckMatch(Column,r):
...     check=''
...     ColList=Column.split(",")
...     for cc in ColList:
...             if(r[cc] != r["x_" + cc]):
...                     check=check + "," + cc
...     return check.replace(',','',1).split(",")

>>> CheckMatchUDF = udf(CheckMatch)

//final column that required to select
>>> finalCol = df1.columns
>>> finalCol.insert(len(finalCol), "column_names")

>>> df3.withColumn("column_names", CheckMatchUDF(lit(','.join(df1.columns)),struct([df3[x] for x in df3.columns])))
       .select(finalCol)
       .show()
+---+----+----+-------+------------+
| id|name| sal|Address|column_names|
+---+----+----+-------+------------+
|  1| ABC|5000|     US|          []|
|  2| DEF|4000|    CAN|   [Address]|
|  3| GHI|3500|    JPN|       [sal]|
|  4|JKLM|4800|    CHN| [name, sal]|
+---+----+----+-------+------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...