Сначала необходимо объявить псевдоним для кадров данных:
val a = df1.as("a")
val b = df2.as("b")
Создать массив со столбцами, которые не были обновлены
val columnsNotUpdated =
Seq(col("a.Id").as("Id"), col("a.field_D").as("field_D"))
Создать массив с обновленными столбцами, ииспользуйте, когда нужно увидеть, пересекается ли он (b.Id не равен NULL) с фреймом данных b, и, если он пересекается, выберите значение фрейма данных 'b'
val columnsUpdated = a.columns
.filter(x => !Array("Id", "field_D").exists(_ == x))
.map(x =>
when(col("b.Id").isNotNull, col(f"b.$x").as(x))
.otherwise(col(f"a.$x").as(x)))
Наконец, объедините с помощью left_outer и выберитестолбцы
a.join(b, col("a.Id") === col("b.Id"), "left_outer")
.select(columnsNotUpdated.union(columnsUpdated): _*)
Весь код:
val a = df1.as("a")
val b = df2.as("b")
val columnsNotUpdated =
Seq(col("a.Id").as("Id"), col("a.field_D").as("field_D"))
val columnsUpdated = a.columns
.filter(x => !Array("Id", "field_D").exists(_ == x))
.map(x =>
when(col("b.Id").isNotNull, col(f"b.$x").as(x))
.otherwise(col(f"a.$x").as(x)))
a.join(b, col("a.Id") === col("b.Id"), "left_outer")
.select(columnsNotUpdated.union(columnsUpdated): _*)