Spark2 dataframe получает разницу двух таблиц, с некоторым исключением - PullRequest
0 голосов
/ 06 июня 2019

Допустим, у меня есть две таблицы, tableA и tableB, они имеют одинаковую схему. Теперь я хотел бы получить несоответствие двух таблиц с одинаковым первичным ключом, но с некоторым исключением значений некоторых столбцов.

Итак, если любой столбец ссылки на таблицу содержит xx, мы считаем, что это будет соответствовать значению другого столбца таблицы. Может кто-нибудь помочь мне с этим в Java? Мне было трудно читать скала код

tableA 
+--+------+------+------+
|id| name |  type|  ref | 
+--+------+------+------+
| 1|aaa   |     a|    a1| 
| 2|bbb   |     b|    xx|
| 3|ccc   |     c|    c3| 
| 4|ddd   |     d|    d4| 
| 6|fff   |     f|    f6| 
| 7|ggg   |     0|    g7| 
+--+------+------+------+

tableB
+--+------+------+------+
|id| name |  type|  ref | 
+--+------+------+------+
| 1|aaa   |     a|    a1| 
| 2|bbb   |     b|    b2|
| 3|ccc   |     c|    xx| 
| 5|eee   |     e|    e5| 
| 6|fff   |     f|   f66| 
| 7|ggg   |     g|    g7| 
+--+------+------+------+    
Expected results:
+--+------+------+-------------+
|id| name |  type|         ref | 
+--+------+------+-------------+
| 6|fff   |     f|   [f6 ->f66]| 
| 7|ggg   |     0|   [0 -> g7 ]| 
+--+------+------+-------------+

Кажется, это работает нормально, но я не уверен в этом.

Dataset<Row> join = data1.join(data2, data1.col("id").equalTo(data2.col("id"))
        .and(data1.col("name").equalTo(data2.col("name")))
        .and(data1.col("type").equalTo(data2.col("type"))
        .and(data1.col("ref").equalTo(data2.col("ref"))
        .or(data1.col("ref").equalTo(lit("xx")))
        .or(data2.col("ref").equalTo(lit("xx"))))), "left_semi");

1 Ответ

0 голосов
/ 06 июня 2019

вы можете просто использовать UDF (User Defined Functions) для достижения этой цели, как только вы объединили свои два кадра данных, как показано ниже:

import sparkSession.sqlContext.implicits._

val df1 = Seq((1, "aaa", "a", "a1"), (2, "bbb", "b", "xx"), (3, "ccc", "c", "c3"), (4, "ddd", "d", "d4"), (6, "fff", "f", "f6")).toDF("id", "name", "type", "ref")
val df2 = Seq((1, "aaa", "a", "a1"), (2, "bbb", "b", "b2"), (3, "ccc", "c", "xx"), (4, "ddd", "d", "d4"), (6, "fff", "f", "f66")).toDF("id", "name", "type", "ref")

val diffCondition: UserDefinedFunction = udf {
  (ref1: String, ref2: String) => {
    var result: String = null
    if (!ref1.equals(ref2) && !"xx".equals(ref1) && !"xx".equals(ref2)) {
      result = s"$ref1 -> $ref2"
    }

    result
  }
}

df1.join(df2, Seq("id", "name", "type"))
  .withColumn("difference", diffCondition(df1("ref"), df2("ref")))
  .filter("difference is not null")
  .show()

и вывод

+---+----+----+---+---+----------+
| id|name|type|ref|ref|difference|
+---+----+----+---+---+----------+
|  6| fff|   f| f6|f66| f6 -> f66|
+---+----+----+---+---+----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...