Я бы хотел вычислить дельту между двумя таблицами (текущая полная и вчерашняя полная).
val df_current_full := spark.sql("select * from current_full")
val df_previous_full := spark.sql("select * from previous_full")
Я делаю полное внешнее соединение между df_current_full и df_previous_full для ключа.
val df_currentFullTableExceptPreviousFullCurrentView: DataFrame = df_currentFullTable
.join(df_previousFullCurrentView, df_currentFullTable(key) ===
df_previousFullCurrentView(key), "full_outer")
Чтобы узнать, удалены или созданы строки, я могу сделать следующее:
val df_currentFullTableExceptPreviousFullCurrentView: DataFrame = df_currentFullTable
.join(df_previousFullCurrentView, df_currentFullTable(key) === df_previousFullCurrentView(key), "full_outer")
.withColumn("flagCreatedDeleted", UDF_udfCreateFlagCreatedDeleted(df_currentFullTable(key),
df_previousFullCurrentView(key)))
val UDF_udfCreateFlagCreatedDeleted = udf(udfCreateFlagCreatedDeleted _)
def udfCreateFlagCreatedDeleted(df_currentFullTable_key: String, df_currentPreviousTable_key: String): String = {
if (df_currentFullTable_key == null && df_currentPreviousTable_key != null) return "S"
else if (df_currentFullTable_key != null && df_currentPreviousTable_key == null) return "C"
else return null
}
Но у меня проблема с измененными строками? Как я могу получить их? У меня есть строки, int, столбцы даты в таблицах.
Спасибо за вашу помощь
Код становится очень длинным, если у меня 50 столбцов и тип не совпадает
val df_currentFullTableExceptPreviousFullCurrentView: DataFrame = df_currentFullTable
.join(df_previousFullCurrentView, df_currentFullTable(key) === df_previousFullCurrentView(key), "full_outer")
.withColumn("flagCreatedDeleted", UDF_udfCreateFlagCreatedDeleted(df_currentFullTable(key),
df_previousFullCurrentView(key)))
.withColumn("flagModifiedStringNameId", UDF_udfCreateFlagModifiedString(df_currentFullTable(key),
df_previousFullCurrentView(key), df_currentFullTable("name_id"), df_previousFullCurrentView("name_id")))
.withColumn("flagModifiedStringSurname", UDF_udfCreateFlagModifiedString(df_currentFullTable(key),
df_previousFullCurrentView(key), df_currentFullTable("Surname"), df_previousFullCurrentView("Surname")))
.withColumn("flagModifiedStringAge", UDF_udfCreateFlagModifiedString(df_currentFullTable(key),
df_previousFullCurrentView(key), df_currentFullTable("Age"), df_previousFullCurrentView("Age")))
.withColumn("flagModifiedStringWorkingE", UDF_udfCreateFlagModifiedString(df_currentFullTable(key),
df_previousFullCurrentView(key), df_currentFullTable("WorkingE"), df_previousFullCurrentView("Working")))
val UDF_udfCreateFlagModifiedString = udf(udfCreateFlagModifiedString _)
def udfCreateFlagModifiedString(df_currentFullTable_key: String, df_currentPreviousTable_key: String,
CurrentStringModified: String, PreviousStringModified: String): String = {
if (df_currentFullTable_key == df_currentPreviousTable_key &&
CurrentStringModified != PreviousStringModified)
return "U"
else return null
}