Сравнение двух датафреймов в Spark - PullRequest
0 голосов
/ 26 июня 2018

Я сравниваю два фрейма данных в искре, используя except().

Например: df.except(df2)

Я получу все записи, которые недоступны в df2 с df. Тем не менее, я хотел бы также перечислить детали поля, которые не совпадают.

Например:

df:
------------------
id,name,age,city
101,kp,28,CHN
------------------

df2:
-----------------
id,name,age,city
101,kp,28,HYD
----------------

Ожидаемый результат:

df3
--------------------------
id,name,age,city,diff
101,kp,28,CHN,City is not matching
--------------------------------

Как я могу добиться этого?

Ответы [ 2 ]

0 голосов
/ 27 июня 2018

Более новая попытка вышеупомянутого, но не возможно изящно, но с JOIN в противоположность кроме. Лучшее, что я могу сделать.

Я считаю, что он делает то, что вам нужно, и учитывает тот факт, что в одном наборе данных есть вещи или нет.

Запустить под Databricks.

case class Person(personid: Int, personname: String, cityid: Int)
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._

val df1 = Seq(
     Person(0, "AgataZ", 0),
     Person(1, "Iweta", 0),
     Person(2, "Patryk", 2),
     Person(9999, "Maria", 2),
     Person(5, "John", 2),
     Person(6, "Patsy", 2),
     Person(7, "Gloria", 222), 
     Person(3333, "Maksym", 0)).toDF

val df2 = Seq(
     Person(0, "Agata", 0),
     Person(1, "Iweta", 0),
     Person(2, "Patryk", 2),
     Person(5, "John", 2),
     Person(6, "Patsy", 333),
     Person(7, "Gloria", 2), 
     Person(4444, "Hans", 3)).toDF

val joined = df1.join(df2, df1("personid") === df2("personid"), "outer") 
val newNames = Seq("personId1", "personName1", "personCity1", "personId2", "personName2", "personCity2")
val df_Renamed = joined.toDF(newNames: _*)

// Some deliberate variation shown in approach for learning 
val df_temp = df_Renamed.filter($"personCity1" =!= $"personCity2" || $"personName1" =!= $"personName2" || $"personName1".isNull || $"personName2".isNull || $"personCity1".isNull || $"personCity2".isNull).select($"personId1", $"personName1".alias("Name"), $"personCity1", $"personId2", $"personName2".alias("Name2"), $"personCity2").  withColumn("PersonID", when($"personId1".isNotNull, $"personId1").otherwise($"personId2"))

val df_final = df_temp.withColumn("nameChange ?", when($"Name".isNull or $"Name2".isNull or $"Name" =!= $"Name2", "Yes").otherwise("No")).withColumn("cityChange ?", when($"personCity1".isNull or $"personCity2".isNull or $"personCity1" =!= $"personCity2", "Yes").otherwise("No")).drop("PersonId1").drop("PersonId2")

df_final.show()

дает:

+------+-----------+------+-----------+--------+------------+------------+
|  Name|personCity1| Name2|personCity2|PersonID|nameChange ?|cityChange ?|
+------+-----------+------+-----------+--------+------------+------------+
| Patsy|          2| Patsy|        333|       6|          No|         Yes|
|Maksym|          0|  null|       null|    3333|         Yes|         Yes|
|  null|       null|  Hans|          3|    4444|         Yes|         Yes|
|Gloria|        222|Gloria|          2|       7|          No|         Yes|
| Maria|          2|  null|       null|    9999|         Yes|         Yes|
|AgataZ|          0| Agata|          0|       0|         Yes|          No|
+------+-----------+------+-----------+--------+------------+------------+
0 голосов
/ 27 июня 2018

Используйте пересечение, чтобы получить значения, общие для обоих фреймов данных, а затем создайте несоответствующую логику

intersect - возвращает новый набор данных, содержащий строки только в этом наборе данных и в другом наборе данных.

df.intersect(df2)
  • возвращает новый RDD, содержащий пересечение элементов в наборе исходных данных и аргумент.

  • пересечение (anotherrdd) возвращает элементы, которые присутствуют в обоих DF.

  • пересечение (anotherrdd) удалить все дубликаты, включая дубликаты в одном DF
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...