Учитывая события с временем начала и окончания, как посчитать количество одновременных событий, используя Spark? - PullRequest
0 голосов
/ 26 сентября 2018

Учитывая огромный набор данных о событиях, каждое из которых имеет свое время начала и окончания следующим образом:

+------+--------------------+--------------------+
|id    |           startTime|             endTime|
+------+--------------------+--------------------+
|     1|2018-01-01 00:00:...|2018-01-01 00:00:...|
|     2|2018-01-01 00:00:...|2018-01-01 00:00:...|
|     3|2018-01-01 00:00:...|2018-01-01 00:00:...|
|     4|2018-01-01 00:00:...|2018-01-01 00:00:...|
|     5|2018-01-01 00:00:...|2018-01-01 00:00:...|
|     6|2018-01-01 00:00:...|2018-01-01 00:00:...|
+------+--------------------+--------------------+

Как подсчитать количество одновременных событий в любой момент времени?следующим образом:

+--------------------+-----+
|                time|count|
+--------------------+-----+
|2018-01-01 00:00:...|    1|
|2018-01-01 00:00:...|    2|
|2018-01-01 00:00:...|    1|
|2018-01-01 00:00:...|    0|
|2018-01-01 00:00:...|    1|
|2018-01-01 00:00:...|    2|
|2018-01-01 00:00:...|    3|
|2018-01-01 00:00:...|    2|
|2018-01-01 00:00:...|    1|
|2018-01-01 00:00:...|    0|
|2018-01-01 00:00:...|    1|
|2018-01-01 00:00:...|    0|
+--------------------+-----+

Это для варианта использования batch, и ниже приводится попытка использования Windows (в надежде, что существуют другие более элегантные / производительные решения, использующие Spark):

case class EventWithEnd(source: String, startTime: Timestamp, endTime: Timestamp)

val eventsWithEnd: Dataset[EventWithEnd] = ...

val ws = Window.orderBy("time").rowsBetween(Long.MinValue, 0)

eventsWithEnd
    .flatMap(e => List(EventTime(e.startTime, "START"), EventTime(e.endTime, "END")))
    .orderBy(asc("time"))
    .withColumn("starts", count(when(col("eventType") === "START", true)) over ws)
    .withColumn("ends", count(when(col("eventType") === "END", true)) over ws)
    .withColumn("count", col("starts") - col("ends"))
    .drop("eventType", "starts", "ends")

1 Ответ

0 голосов
/ 26 декабря 2018

В этом решении используется API набора данных: дано, если Es вычисляет EC

// event where start (s) is inclusive, end (e) is exclusive
case class E(id: String, s: Int, e: Option[Int])

object E {
  def apply(s: Int, e: Int): E = E(UUID.randomUUID.toString, s, Some(e))
}

//count of events at t
case class EC(t: Int, count: Int)

//transformation
implicit class EOps(es: Dataset[E]) {

    def counts(implicit spark: SparkSession): Dataset[EC] = {
      import spark.implicits._

      val bs: Dataset[B] = es
        .flatMap(e => Seq(B(e.s, "start"), B(e.e.get, "end")))

      val ts: Dataset[Int] = bs
        .map(b => b.t)
        .distinct()

      val bbs: Dataset[(Int, B)] = ts
        .joinWith(
          bs,
          ts("value") >= bs("t"),
          "left_outer")

      bbs
        .groupByKey { case (l, _) => l }
        .mapGroups { case (k, vs) =>
          val count: Int = vs
            .map { case (_, b) => b.bt }
            .foldLeft(0) { (c, v) =>
              v match {
                case "start" => c + 1
                case "end" => c - 1
              }
            }
          EC(k, count)
        }
    }
  }

//Example test case
"EOps" should "return counts at time t given events" in {

    val es: Dataset[E] = Seq(E(1, 3), E(2, 4), E(3, 5))
      .toDS

    val cs: Dataset[EC] = es
      .counts

    //cs.explain()

    cs.collect() must contain theSameElementsAs
      Seq(EC(1, 1), EC(2, 2), EC(3, 2), EC(4, 1), EC(5, 0))
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...