Как выполнить полное сравнение базы данных с базой данных, используя apache spark, отдельные поля данных - PullRequest
0 голосов
/ 17 апреля 2020

Я хочу сделать сравнение базы данных с базой данных до уровня отдельного поля данных. Поэтому я хочу разбить его на каждое отдельное поле данных. В настоящее время у меня есть следующие шаги:

    1. Ввести всю таблицу в Spark
    2. Создать df из таблицы улья / таблицы базы данных (все столбцы, все исходные имена)
    3. Удалить несоответствующие поля (нужны только совпадающие столбцы, оригинальные имена)
    4. Создать общее соглашение об именах в файле conf для 2 таблиц
    5. Обрезать столбцы
    6. Добавьте в каждый из 2 DF следующие столбцы:
val df1_dropped_trimmed_renamed_errored= df1_dropped_trimmed_renamed.add("ERR_COLUMN", StringType, true)
.add("ERR_VALUE_SOURCE", StringType, true)
.add("ERR_VALUE_SINK", StringType, true)
.add("ERR_DESCRIPTION", StringType, true)

val df2_dropped_trimmed_renamed_errored= df2_dropped_trimmed_renamed.add("ERR_COLUMN", StringType, true)
.add("ERR_VALUE_SOURCE", StringType, true)
.add("ERR_VALUE_SINK", StringType, true)
.add("ERR_DESCRIPTION", StringType, true)
ЭТО ТАМ, ГДЕ НУЖНА ПОМОЩЬ Текущая мысль как таковая (нашла это из другого источника, но запуталась)
def validateColumns(row: Row): Row = {
var err_col: String = null
var err_val_source: String = null
var err_val_sink: String = null
var err_desc: String = null
val matching_col1 = row.getAs[String]("matchingCol1")**// I want to declare matchingCol1 in my conf file, so when I set the name here, can I use the variable language to do so? ** 
val matching_col2 = row.getAs[String]("matchingCol2")
val matching_col3 = row.getAs[String]("matchingCol3")

Итак, я запутавшись в том, как сделать эту логику c, я хочу заполнить df1_dropped_trimmed_renamed_errored столбцами, которые совпадают / отсутствуют: - но что произойдет, если несколько столбцов не совпадают? - Как я могу заполнить другие поля, я должен даже заполнить другие поля? думая о проблемах с пространством / скоростью

-Так, я думаю, я использую if / else:

val mismatches_df_1 = Df1_dropped_renamed_trimmed.except(Df2_dropped_renamed_trimmed)
If (mismatches_df_1.count() > 0) {


if(Df1_dropped_renamed_trimmed.matching_col1 != Df2_dropped_renamed_trimmed.matching_col1)
 {insert into mismatches_df_1 VALUES (Df1_dropped_renamed_trimmed.matching_col1 as matching_col1, 
err_col = matching_col1, err_val_source = Df1_dropped_renamed_trimmed.matching_col1, err_desc = matching_col1 + " does not match value in " + tbl2  )}
else{ insert into mismatches_df_1 VALUES(Df1_dropped_renamed_trimmed.matching_col1 as matching_col1) }



if( Df1_dropped_renamed_trimmed.matching_col2 != Df2_dropped_renamed_trimmed.matching_col2)
 {insert into mismatches_df_1 VALUES
(Df1_dropped_renamed_trimmed.matching_col2 as matching_col2, 
err_col = matching_col2, err_val_source = Df1_dropped_renamed_trimmed.matching_col2, err_desc = matching_col2 + " does not match value in " + tbl2  )}
else{ insert into mismatches_df_1 VALUES(Df1_dropped_renamed_trimmed.matching_col2 as matching_col2) }



if( Df1_dropped_renamed_trimmed.matching_col3 != Df2_dropped_renamed_trimmed.matching_col3)
 {insert into mismatches_df_1 VALUES
(Df1_dropped_renamed_trimmed.matching_col3 as matching_col3, 
err_col =matching_col3, err_val_source = Df1_dropped_renamed_trimmed.matching_col3, err_desc = matching_col3 + " does not match value in " + tbl2   )}
else{insert into mismatches_df_1 VALUES(Df1_dropped_renamed_trimmed.matching_col3 as matching_col3) 

}

Так что меня беспокоит: когда есть только 1 несоответствие / ошибка, будет ли это заполнять одну строку с другими значениями исходной строки, или мне нужно что-то добавить?

При наличии нескольких несовпадений / ошибок это будет заполнять одну строку ? или это создаст отдельные строки? Я все еще хотел бы заполнить другие поля исходными значениями. **

...