Учитывая набор данных S, цель состоит в том, чтобы создать набор данных E, где S и E определены ниже:
// event where start (s) is inclusive, end (e) is exclusive
case class E(id: Int, state: String, s: Int, e: Option[Int])
//snapshot with state at t for an id
case class S(id: Int, state: String, time: Int)
//Example test case
val ss: Dataset[S] = Seq(S(100, "a", 1), S(100, "b", 2), S(100, "b", 3), S(100, "a", 4), S(100, "a", 5), S(100, "a", 6), S(100, "c", 9))
.toDS
val es: Dataset[E] = ss
.toEs
es.collect() must contain theSameElementsAs
Seq(E(100, "a", 1, Some(2)), E(100, "b", 2, Some(4)), E(100, "a", 4, Some(9)), E(100, "c", 9, None))
Состояние может иметь несколько снимков (в разное время), но вывод долженнакапливать эффективное время начала и окончания.Также предполагается, что последнее активное состояние не имеет конечной даты (опция) в выходных данных.
toEs
выше определяется следующим образом:
implicit class SOps(ss: Dataset[S]) {
def toEs(implicit spark: SparkSession): Dataset[E] = ???
}
Следующий рисунок описывает желаемое преобразование