Рассмотрим два кадра данных data_df
и update_df
. Эти два кадра данных имеют одинаковую схему (key, update_time, bunch of columns)
.
Я знаю два (основных) способа "обновления" data_df
с помощью update_df
- полное внешнее объединение
Я присоединяюсь к двум фреймам данных (по ключу) и затем выбираю соответствующие столбцы (в соответствии со значением update_timestamp) - макс. Над разделом
Объединение обоих фреймов данных , вычислите максимальное значение update_timestamp по ключу, а затем отфильтруйте только те строки, которые равны этому максимуму.
Вот вопросы:
- Есть ли другой способ?
- Какой из них лучший и почему?
Я уже провел сравнение с некоторыми открытыми данными
Вот код соединения
var join_df = data_df.alias("data").join(maj_df.alias("maj"), Seq("key"), "outer")
var res_df = join_df.where( $"data.update_time" > $"maj.update_time" || $"maj.update_time".isNull)
.select(col("data.*"))
.union(
join_df.where( $"data.update_time" < $"maj.update_time" || $"data.update_time".isNull)
.select(col("maj.*")))
А здесь это код окна
import org.apache.spark.sql.expressions._
val byKey = Window.partitionBy($"key") // orderBy is implicit here
res_df = data_df.union(maj_df)
.withColumn("max_version", max("update_time").over(byKey))
.where($"update_time" === $"max_version")
Я могу вставить вам DAG и планы, если это необходимо, но они довольно большие
Мое первое предположение, что объединение может быть лучшим способом, но оно только работает, если update
фрейм данных получил только одну версию для каждого ключа.
PS: я вон Ре Apache Дельта-решение, но, к сожалению, я не могу его использовать.