Учитывая два набора данных S и R, оба со столбцом времени (t), как описано ниже:
//snapshot with id at t
case class S(id: String, t: Int)
//reference data at t
case class R(t: Int, fk: String)
//Example test case
val ss: Dataset[S] = Seq(S("a", 1), S("a", 3), S("b", 5), S("b", 7))
.toDS
val rs: Dataset[R] = Seq(R(0, "a"), R(2, "a"), R(6, "b"))
.toDS
val srs: Dataset[(S, Option[R])] = ss
.asOfJoin(rs)
srs.collect() must contain theSameElementsAs
Seq((S("a", 1), Some(R(0, "a"))), (S("a", 3), Some(R(2, "a"))), (S("b", 5), None), (S("b", 7), Some(R(6, "b"))))
Цель состоит в том, чтобы найти самую последнюю строку в R, которая соответствует идентификатору E, если это возможно, т.е. R можетбыть необязательным в выходных данных.
asOfJoin
определяется следующим образом:
implicit class SOps(ss: Dataset[S]) {
def asOfJoin(rs: Dataset[R])(implicit spark: SparkSession): Dataset[(S, Option[R])] = ???
}
Одно из решений с использованием API набора данных выглядит следующим образом:
def asOfJoin(rs: Dataset[R])(implicit spark: SparkSession): Dataset[(S, Option[R])] = {
import spark.implicits._
ss
.joinWith(
rs,
ss("id") === rs("fk") && ss("t") >= rs("t"),
"left_outer")
.map { case (l, r) => (l, Option(r)) }
.groupByKey { case (s, _) => s }
.reduceGroups { (x, y) =>
(x, y) match {
case ((_, Some(R(tx, _))), (_, Some(R(ty, _)))) => if (tx > ty) x else y
case _ => x
}
}
.map { case (_, r) => r }
}