Выполните итерацию по каждому столбцу в кадре данных и выполните сравнение - PullRequest
0 голосов
/ 19 апреля 2020

Я хочу создать функцию сравнения из таблицы в таблицу на фрейме данных, и я хочу посмотреть, смогу ли я создать более быстрый процесс для оператора if. Я хочу иметь возможность перебирать каждый столбец в кадре данных, и мне любопытно, могу ли я использовать положение столбца или что-то другое вместо того, чтобы создавать конфигурацию для каждого имени столбца. Когда у меня есть 100 столбцов, я не хочу объявлять имя каждого столбца в файле conf, поэтому кто-то может указать мне направление более быстрого метода?

Файл конфигурации:

matching_col1= member_id
matching_col2= activation_idn
matching_col3= addition_dt

Фактический код:

 val mismatches_df_1 = Df1_renamed_matching.except(Df2_renamed_matching)
    if (mismatches_df_1.count() > 0) {

if(DF1_with_err_cols.matching_col1 != DF2_with_err_cols.matching_col1)
{insert into mismatches_df_1 VALUES (DF1_with_err_cols.matching_col1 as matching_col1, ERR_COLUMN                         
= matching_col1, ERR_VALUE_SOURCE = DF1_with_err_cols.matching_col1, ERR_DESCRIPTION = 
matching_col1 + " does not match value in " + source_db_jdbc_table_name2  )}
else{ insert into mismatches_df_1 VALUES(DF1_with_err_cols.matching_col1 as matching_col1) 
  ....
if(DF1_with_err_cols.matching_col14 != DF2_with_err_cols.matching_col14)
{insert into mismatches_df_1 VALUES (DF1_with_err_cols.matching_col14 as matching_col14, 
ERR_COLUMN = matching_col14, ERR_VALUE_SOURCE = DF1_with_err_cols.matching_col14, ERR_DESCRIPTION 
= matching_col14 + " does not match value in " + source_db_jdbc_table_name2  )}
else{ insert into mismatches_df_1 VALUES(DF1_with_err_cols.matching_col14 as matching_col14) }

, поэтому вместо того, чтобы делать это 1-100 раз для таблицы / фрейма данных, есть ли способ создать al oop? или для каждого типа функции, основанной на позиции столбца?

Любая помощь приветствуется!

1 Ответ

0 голосов
/ 19 апреля 2020
package spark

import org.apache.spark.sql.SparkSession

object DataFramesExcept extends App {

  val spark = SparkSession.builder()
    .master("local")
    .appName("DataFrame-example")
    .getOrCreate()

  import spark.implicits._
  case class Sale(
                   corp:    String,
                   product: String,
                   data:    Long,
                   group:   String,
                   sales:   Long,
                   market:  String
                 )

  val df = Seq(
    Sale("A", "Eli", 43831, "A", 100, "I"),
    Sale("A", "Eli", 43831, "B", 100, "I"),
    Sale("A", "Sut", 43831, "A", 80, "I"),
    Sale("A", "Api", 43831, "C", 50, "C or D"),
    Sale("A", "Api", 43831, "D", 50, "C or D"),
    Sale("B", "Konkurent2", 43831, "C", 40, "C or D")
  ).toDF()

  val dfE = Seq(
    Sale("Z", "Eli", 43833, "A", 100, "M"),
    Sale("Z", "Eli", 43833, "B", 100, "M"),
    Sale("Z", "Sut", 43833, "A", 80, "M"),
    Sale("Z", "Api", 43833, "C", 50, "M"),
    Sale("Z", "Api", 43833, "D", 50, "M"),
    Sale("Z", "Konkurent2", 43831, "C", 40, "M")
  ).toDF()

  val cols = df.columns.toList

  cols foreach(c => {
    if(df.select(c).except(dfE.select(c)).rdd.isEmpty() == false) {
      println(s"$c is not empty")
    } else {
      println(s"$c is empty")
    }
  })
// result
//  corp is not empty
//  product is empty
//  data is empty
//  group is empty
//  sales is empty
//  market is not empty

}
...