Как сравнить столбцы двух таблиц с помощью Spark? - PullRequest
0 голосов
/ 30 октября 2019

Я пытаюсь сравнить две таблицы (), читая как DataFrames. И для каждого общего столбца в этих таблицах используется конкатенация первичного ключа, например, order_id с другими столбцами, такими как order_date, order_name, order_event.

Код Scala, который я использую

val primary_key=order_id
for (i <- commonColumnsList){
      val column_name = i
      val tempDataFrameForNew = newDataFrame.selectExpr(s"concat($primaryKey,$i) as concatenated")
      val tempDataFrameOld = oldDataFrame.selectExpr(s"concat($primaryKey,$i) as concatenated")

      //Get those records which aren common in both old/new tables
      matchCountCalculated = tempDataFrameForNew.intersect(tempDataFrameOld)
      //Get those records which aren't common in both old/new tables
      nonMatchCountCalculated = tempDataFrameOld.unionAll(tempDataFrameForNew).except(matchCountCalculated)

      //Total Null/Non-Null Counts in both old and new tables.
      nullsCountInNewDataFrame = newDataFrame.select(s"$i").filter(x => x.isNullAt(0)).count().toInt
      nullsCountInOldDataFrame = oldDataFrame.select(s"$i").filter(x => x.isNullAt(0)).count().toInt
      nonNullsCountInNewDataFrame = newDFCount - nullsCountInNewDataFrame
      nonNullsCountInOldDataFrame = oldDFCount - nullsCountInOldDataFrame

      //Put the result for a given column in a Seq variable, later convert it to Dataframe.
      tempSeq = tempSeq :+ Row(column_name, matchCountCalculated.toString, nonMatchCountCalculated.toString, (nullsCountInNewDataFrame - nullsCountInOldDataFrame).toString,
       (nonNullsCountInNewDataFrame - nonNullsCountInOldDataFrame).toString)
}
     // Final Step: Create DataFrame using Seq and some Schema.
     spark.createDataFrame(spark.sparkContext.parallelize(tempSeq), schema)

Код вышеработает нормально для среднего набора данных, но по мере увеличения количества столбцов и записей в моей новой и старой таблице время выполнения увеличивается. Любые советы приветствуются. Заранее спасибо.

1 Ответ

0 голосов
/ 30 октября 2019

Вы можете сделать следующее:
1. Внешнее соединение старого и нового фрейма данных с первичным ключом
joined_df = df_old.join(df_new, primary_key, "outer") 2. Кэшировать его, если возможно. Это сэкономит вам много времени
3. Теперь вы можете перебирать столбцы и сравнивать столбцы, используя функции искры (.isNull для несоответствия, == для сопоставления и т. Д.)

for (col <- df_new.columns){
  val matchCount = df_joined.filter(df_new[col].isNotNull && df_old[col].isNotNull).count()
  val nonMatchCount = ...
}

Этодолжно быть значительно быстрее, особенно когда вы можете кэшировать ваш фрейм данных. Если вы не можете, это может быть хорошей идеей, поэтому сохраняйте df на диск, чтобы избежать случайного перемешивания

...