Как рассчитать дельту между двумя кадрами? - PullRequest
0 голосов
/ 27 января 2020

Я бы хотел вычислить дельту между двумя таблицами (текущая полная и вчерашняя полная).

val df_current_full := spark.sql("select * from current_full")
val df_previous_full := spark.sql("select * from previous_full")

Я делаю полное внешнее соединение между df_current_full и df_previous_full для ключа.

val df_currentFullTableExceptPreviousFullCurrentView: DataFrame = df_currentFullTable
  .join(df_previousFullCurrentView, df_currentFullTable(key) ===
    df_previousFullCurrentView(key), "full_outer")

Чтобы узнать, удалены или созданы строки, я могу сделать следующее:

val df_currentFullTableExceptPreviousFullCurrentView: DataFrame = df_currentFullTable
  .join(df_previousFullCurrentView, df_currentFullTable(key) === df_previousFullCurrentView(key), "full_outer")
  .withColumn("flagCreatedDeleted", UDF_udfCreateFlagCreatedDeleted(df_currentFullTable(key),
    df_previousFullCurrentView(key)))

val UDF_udfCreateFlagCreatedDeleted = udf(udfCreateFlagCreatedDeleted _)

def udfCreateFlagCreatedDeleted(df_currentFullTable_key: String, df_currentPreviousTable_key: String): String = {


  if (df_currentFullTable_key == null && df_currentPreviousTable_key != null) return "S"
  else if (df_currentFullTable_key != null && df_currentPreviousTable_key == null) return "C"
  else return null
}

Но у меня проблема с измененными строками? Как я могу получить их? У меня есть строки, int, столбцы даты в таблицах.

Спасибо за вашу помощь

Код становится очень длинным, если у меня 50 столбцов и тип не совпадает

val df_currentFullTableExceptPreviousFullCurrentView: DataFrame = df_currentFullTable
  .join(df_previousFullCurrentView, df_currentFullTable(key) === df_previousFullCurrentView(key), "full_outer")

  .withColumn("flagCreatedDeleted", UDF_udfCreateFlagCreatedDeleted(df_currentFullTable(key),
    df_previousFullCurrentView(key)))
  .withColumn("flagModifiedStringNameId", UDF_udfCreateFlagModifiedString(df_currentFullTable(key),
    df_previousFullCurrentView(key), df_currentFullTable("name_id"), df_previousFullCurrentView("name_id")))
  .withColumn("flagModifiedStringSurname", UDF_udfCreateFlagModifiedString(df_currentFullTable(key),
    df_previousFullCurrentView(key), df_currentFullTable("Surname"), df_previousFullCurrentView("Surname")))
  .withColumn("flagModifiedStringAge", UDF_udfCreateFlagModifiedString(df_currentFullTable(key),
    df_previousFullCurrentView(key), df_currentFullTable("Age"), df_previousFullCurrentView("Age")))
  .withColumn("flagModifiedStringWorkingE", UDF_udfCreateFlagModifiedString(df_currentFullTable(key),
    df_previousFullCurrentView(key), df_currentFullTable("WorkingE"), df_previousFullCurrentView("Working")))

val UDF_udfCreateFlagModifiedString = udf(udfCreateFlagModifiedString _)

def udfCreateFlagModifiedString(df_currentFullTable_key: String, df_currentPreviousTable_key: String,
                                CurrentStringModified: String, PreviousStringModified: String): String = {
  if (df_currentFullTable_key == df_currentPreviousTable_key &&
    CurrentStringModified != PreviousStringModified)
    return "U"

  else return null
}

Ответы [ 2 ]

0 голосов
/ 27 января 2020

Для этого вам даже не нужна UDF: если previous.id равно нулю, то строка была создана, если current.id равно нулю, то она была удалена. Если оба значения не равны NULL, это означает, что строка присутствует в обоих кадрах данных, поэтому вы можете проверить равенство обеих строк. Если они разные, значит, произошло обновление.

val prev = Seq(Data(1, "foo", "bar"), Data(2, "foo2", "bar2"), Data(3, "foo3", "bar3")).toDF
val curr = Seq(Data(1, "foo", "barNew"), Data(3, "foo3", "bar3"), Data(4, "foo4", "bar4")).toDF

prev.createOrReplaceTempView("previous_full")
curr.createOrReplaceTempView("current_full")

spark.sql("""
  select *,
       (case when previous_full.id is null then 'C'
             when current_full.id is null then 'S'
             when struct(previous_full.*) <> struct(current_full.*) then 'U'
             else null end) as flag
  from previous_full
  full outer join current_full on previous_full.id = current_full.id""").show

/*
+----+----+----+----+----+------+----+
|  id|   x|   y|  id|   x|     y|flag|
+----+----+----+----+----+------+----+
|   1| foo| bar|   1| foo|barNew|   U|
|   3|foo3|bar3|   3|foo3|  bar3|null|
|null|null|null|   4|foo4|  bar4|   C|
|   2|foo2|bar2|null|null|  null|   S|
+----+----+----+----+----+------+----+
*/
0 голосов
/ 27 января 2020

Вы можете сделать так же:

val isUpdatedColumnUDF = udf(isUpdatedColumn _)

def isUpdatedColumn(currentColumn: String, previousColumn: String): String = 
  if (previousColumn != currentColumn) return "updated"
  else null
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...