Лучший способ обновить фрейм данных в Spark scala - PullRequest
0 голосов
/ 04 февраля 2020

Рассмотрим два кадра данных data_df и update_df. Эти два кадра данных имеют одинаковую схему (key, update_time, bunch of columns).

Я знаю два (основных) способа "обновления" data_df с помощью update_df

  1. полное внешнее объединение
    Я присоединяюсь к двум фреймам данных (по ключу) и затем выбираю соответствующие столбцы (в соответствии со значением update_timestamp)
  2. макс. Над разделом
    Объединение обоих фреймов данных , вычислите максимальное значение update_timestamp по ключу, а затем отфильтруйте только те строки, которые равны этому максимуму.

Вот вопросы:

  • Есть ли другой способ?
  • Какой из них лучший и почему?

Я уже провел сравнение с некоторыми открытыми данными
Вот код соединения

var join_df = data_df.alias("data").join(maj_df.alias("maj"), Seq("key"), "outer")
var res_df = join_df.where( $"data.update_time" > $"maj.update_time" || $"maj.update_time".isNull)
             .select(col("data.*"))
         .union(
         join_df.where( $"data.update_time" < $"maj.update_time" || $"data.update_time".isNull)
             .select(col("maj.*")))

А здесь это код окна

import org.apache.spark.sql.expressions._

val byKey = Window.partitionBy($"key") // orderBy is implicit here

res_df = data_df.union(maj_df)
                .withColumn("max_version", max("update_time").over(byKey))
                .where($"update_time" === $"max_version")

Я могу вставить вам DAG и планы, если это необходимо, но они довольно большие

Мое первое предположение, что объединение может быть лучшим способом, но оно только работает, если update фрейм данных получил только одну версию для каждого ключа.


PS: я вон Ре Apache Дельта-решение, но, к сожалению, я не могу его использовать.

1 Ответ

0 голосов
/ 06 февраля 2020

Ниже приведен один из способов сделать это только для объединения ключей, чтобы минимизировать объем памяти, используемой фильтрами и командами объединения.

///Two records, one with a change, one no change
val originalDF = spark.sql("select 'aa' as Key, 'Joe' as Name").unionAll(spark.sql("select 'cc' as Key, 'Doe' as Name")) 


///Two records, one change, one new
val updateDF =  = spark.sql("select 'aa' as Key, 'Aoe' as Name").unionAll(spark.sql("select 'bb' as Key, 'Moe' as Name"))

///Make new DFs of each just for Key   
val originalKeyDF = originalDF.selectExpr("Key")
val updateKeyDF = updateDF.selectExpr("Key")

///Find the keys that are similar between both
val joinKeyDF = updateKeyDF.join(originalKeyDF, updateKeyDF("Key") === originalKeyDF("Key"), "inner")
///Turn the known keys into an Array
val joinKeyArray = joinKeyDF.select(originalKeyDF("Key")).rdd.map(x=>x.mkString).collect

///Filter the rows from original that are not found in the new file
val originalNoChangeDF = originalDF.where(!($"Key".isin(joinKeyArray:_*)))

///Update the output with unchanged records, update records, and new records
val finalDF = originalNoChangeDF.unionAll(updateDF)
...