Возрастной столбец может быть назначен каждому Dataframe, затем Dataframes объединены в один, а затем используется оконная функция:
// data preparation
val columnNames = List("CID", "PID", "Metric")
val latest = List(
("C1", "P1", 10),
("C2", "P1", 20),
("C2", "P2", 30)
).toDF(columnNames: _*)
val previous = List(
("C1", "P1", 20),
("C2", "P1", 30),
("C3", "P1", 40),
("C3", "P2", 50)
).toDF(columnNames: _*)
val oldest = List(
("C1", "P1", 30),
("C2", "P1", 40),
("C3", "P1", 50),
("C3", "P2", 60),
("C4", "P1", 30)
).toDF(columnNames: _*)
// \ data preparation
val dfList = List(oldest, previous, latest)
val dfListWithIndexColumn = dfList.zipWithIndex.map { case (df, index) => df.withColumn("age", lit(index)) }
val unitedDF = dfListWithIndexColumn.reduce(_ union _)
val cidPidWindow = Window.partitionBy("CID", "PID").orderBy($"age".desc)
val result = unitedDF
.withColumn("rank", rank.over(cidPidWindow))
.where($"rank" === 1)
.drop("age", "rank")
result.show(false)
Выход:
+---+---+------+
|CID|PID|Metric|
+---+---+------+
|C1 |P1 |10 |
|C2 |P1 |20 |
|C2 |P2 |30 |
|C3 |P1 |40 |
|C3 |P2 |50 |
|C4 |P1 |30 |
+---+---+------+