Как сравнить два кадра данных в Scala - PullRequest
0 голосов
/ 29 июня 2018

У меня есть два абсолютно одинаковых кадра данных для сравнительного теста

     df1
     ------------------------------------------
     year | state | count2 | count3 | count4|
     2014 | NJ    | 12332  | 54322  | 53422 |
     2014 | NJ    | 12332  | 53255  | 55324 |
     2015 | CO    | 12332  | 53255  | 55324 |
     2015 | MD    | 14463  | 76543  | 66433 |
     2016 | CT    | 14463  | 76543  | 66433 |
     2016 | CT    | 55325  | 76543  | 66433 |
     ------------------------------------------
     df2
     ------------------------------------------
     year | state | count2 | count3 | count4|
     2014 | NJ    | 12332  | 54322  | 53422 |
     2014 | NJ    | 65333  | 65555  | 125   |
     2015 | CO    | 12332  | 53255  | 55324 |
     2015 | MD    | 533    | 75     | 64524 |
     2016 | CT    | 14463  | 76543  | 66433 |
     2016 | CT    | 55325  | 76543  | 66433 |
     ------------------------------------------

Я хочу сравнить с этими двумя значениями dfs от count2 до count4, если количество не совпадает, выведите какое-то сообщение о том, что оно не соответствует. вот моя попытка

     val cols = df1.columns.filter(_ != "year").toList
     def mapDiffs(name: String) = when($"l.$name" === $"r.$name", null).otherwise(array($"l.$name", $"r.$name")).as(name)
     val result = df1.as("l").join(df2.as("r"), "year").select($"year" :: cols.map(mapDiffs): _*)

тогда он сравнивается с тем же состоянием с тем же номером, он не сделал то, что я хотел сделать

     ------------------------------------------
     year | state | count2 | count3 | count4|
     2014 | NJ    | 12332  | 54322  | 53422 |
     2014 | NJ    | no     | no     | no    |
     2015 | CO    | 12332  | 53255  | 55324 |
     2015 | MD    | no     | no     | 64524 |
     2016 | CT    | 14463  | 76543  | 66433 |
     2016 | CT    | 55325  | 76543  | 66433 |
     ------------------------------------------

Я хочу, чтобы результат получился, как указано выше, как мне этого добиться?

редактирует, также в другом сценарии, если я хочу сравнить только в одном df, col с cols, как мне это сделать? как

 ------------------------------------------
 year | state | count2 | count3 | count4|
 2014 | NJ    | 12332  | 54322  | 53422 |

Я хочу сравнить count3 и считать 4 столбца с count2, очевидно, count3 и count 4 не совпадают с count2, поэтому я хочу, чтобы результат был

-----------------------------------------------
 year | state | count2 | count3    | count4   |
 2014 | NJ    | 12332  | mismatch  | mismatch |

Спасибо!

1 Ответ

0 голосов
/ 29 июня 2018

Фрейм данных join на year не будет работать для вашего метода mapDiffs. Вам нужен столбец с идентификатором строки в df1 и df2 для join.

import org.apache.spark.sql.functions._

val df1 = Seq(
  ("2014", "NJ", "12332", "54322", "53422"),
  ("2014", "NJ", "12332", "53255", "55324"),
  ("2015", "CO", "12332", "53255", "55324"),
  ("2015", "MD", "14463", "76543", "64524"),
  ("2016", "CT", "14463", "76543", "66433"),
  ("2016", "CT", "55325", "76543", "66433")
).toDF("year", "state", "count2", "count3", "count4")

val df2 = Seq(
  ("2014", "NJ", "12332", "54322", "53422"),
  ("2014", "NJ", "12332", "53255", "125"),
  ("2015", "CO", "12332", "53255", "55324"),
  ("2015", "MD", "533",   "75",    "64524"),
  ("2016", "CT", "14463", "76543", "66433"),
  ("2016", "CT", "55325", "76543", "66433")
).toDF("year", "state", "count2", "count3", "count4")

Пропустите это, если у вас уже есть столбец определения строки (скажем, rowId) в кадрах данных для join:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val rdd1 = df1.rdd.zipWithIndex.map{
  case (row: Row, id: Long) => Row.fromSeq(row.toSeq :+ id)
}
val df1i = spark.createDataFrame( rdd1,
  StructType(df1.schema.fields :+ StructField("rowId", LongType, false))
)

val rdd2 = df2.rdd.zipWithIndex.map{
  case (row: Row, id: Long) => Row.fromSeq(row.toSeq :+ id)
}
val df2i = spark.createDataFrame( rdd2,
  StructType(df2.schema.fields :+ StructField("rowId", LongType, false))
)

Теперь определите mapDiffs и примените его к выбранным столбцам после объединения фреймов данных с помощью rowId:

def mapDiffs(name: String) =
  when($"l.$name" === $"r.$name", $"l.$name").otherwise("no").as(name)

val cols = df1i.columns.filter(_.startsWith("count")).toList

val result = df1i.as("l").join(df2i.as("r"), "rowId").
  select($"l.rowId" :: $"l.year" :: cols.map(mapDiffs): _*)

// +-----+----+------+------+------+
// |rowId|year|count2|count3|count4|
// +-----+----+------+------+------+
// |    0|2014| 12332| 54322| 53422|
// |    5|2016| 55325| 76543| 66433|
// |    1|2014| 12332| 53255|    no|
// |    3|2015|    no|    no| 64524|
// |    2|2015| 12332| 53255| 55324|
// |    4|2016| 14463| 76543| 66433|
// +-----+----+------+------+------+

Обратите внимание, что между df1 и df2, по-видимому, имеется больше расхождений, чем только 3 no -почек в вашем примере результата. Я изменил пример данных, чтобы сделать эти три пятна единственной разницей.

...