При заданном наборе данных моментальных снимков состояний в момент времени t, как преобразовать его в набор данных с эффективным временем начала и окончания каждого состояния? - PullRequest
0 голосов
/ 26 декабря 2018

Учитывая набор данных S, цель состоит в том, чтобы создать набор данных E, где S и E определены ниже:

// event where start (s) is inclusive, end (e) is exclusive
case class E(id: Int, state: String, s: Int, e: Option[Int])

//snapshot with state at t for an id
case class S(id: Int, state: String, time: Int)

//Example test case
val ss: Dataset[S] = Seq(S(100, "a", 1), S(100, "b", 2), S(100, "b", 3), S(100, "a", 4), S(100, "a", 5), S(100, "a", 6), S(100, "c", 9))
      .toDS

val es: Dataset[E] = ss
      .toEs

es.collect() must contain theSameElementsAs
      Seq(E(100, "a", 1, Some(2)), E(100, "b", 2, Some(4)), E(100, "a", 4, Some(9)), E(100, "c", 9, None))

Состояние может иметь несколько снимков (в разное время), но вывод долженнакапливать эффективное время начала и окончания.Также предполагается, что последнее активное состояние не имеет конечной даты (опция) в выходных данных.

toEs выше определяется следующим образом:

implicit class SOps(ss: Dataset[S]) {
    def toEs(implicit spark: SparkSession): Dataset[E] = ???
}

Следующий рисунок описывает желаемое преобразование

1 Ответ

0 голосов
/ 03 января 2019

Ниже приведено решение с использованием flatMapGroups, которое выливается на диск, если группа слишком велика, чтобы поместиться в памяти.

def toEs(implicit spark: SparkSession): Dataset[E] = {
  import spark.implicits._

  ss
    .sort(ss("id"), ss("t"))
    .groupByKey(s => s.id)
    .flatMapGroups { (_, ss) =>
      new Iterator[E] {
        var nextStart: Option[S] = None

        override def hasNext: Boolean = ss.hasNext || nextStart.isDefined

        override def next(): E = {
          if (ss.hasNext) {

            val start = nextStart.getOrElse(ss.next())
            var last = ss.next()

            while (last.state == start.state)
              last = ss.next()

            nextStart = Some(last)
            E(start.id, start.state, start.t, Some(last.t))
          } else {
            val Some(start) = nextStart
            nextStart = None
            E(start.id, start.state, start.t, None)
          }
        }
      }
    }
}

Это выглядит супер императивом, поэтому не очень счастливым: (

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...