Учитывая огромный набор данных о событиях, каждое из которых имеет свое время начала и окончания следующим образом:
+------+--------------------+--------------------+
|id | startTime| endTime|
+------+--------------------+--------------------+
| 1|2018-01-01 00:00:...|2018-01-01 00:00:...|
| 2|2018-01-01 00:00:...|2018-01-01 00:00:...|
| 3|2018-01-01 00:00:...|2018-01-01 00:00:...|
| 4|2018-01-01 00:00:...|2018-01-01 00:00:...|
| 5|2018-01-01 00:00:...|2018-01-01 00:00:...|
| 6|2018-01-01 00:00:...|2018-01-01 00:00:...|
+------+--------------------+--------------------+
Как подсчитать количество одновременных событий в любой момент времени?следующим образом:
+--------------------+-----+
| time|count|
+--------------------+-----+
|2018-01-01 00:00:...| 1|
|2018-01-01 00:00:...| 2|
|2018-01-01 00:00:...| 1|
|2018-01-01 00:00:...| 0|
|2018-01-01 00:00:...| 1|
|2018-01-01 00:00:...| 2|
|2018-01-01 00:00:...| 3|
|2018-01-01 00:00:...| 2|
|2018-01-01 00:00:...| 1|
|2018-01-01 00:00:...| 0|
|2018-01-01 00:00:...| 1|
|2018-01-01 00:00:...| 0|
+--------------------+-----+
Это для варианта использования batch
, и ниже приводится попытка использования Windows (в надежде, что существуют другие более элегантные / производительные решения, использующие Spark
):
case class EventWithEnd(source: String, startTime: Timestamp, endTime: Timestamp)
val eventsWithEnd: Dataset[EventWithEnd] = ...
val ws = Window.orderBy("time").rowsBetween(Long.MinValue, 0)
eventsWithEnd
.flatMap(e => List(EventTime(e.startTime, "START"), EventTime(e.endTime, "END")))
.orderBy(asc("time"))
.withColumn("starts", count(when(col("eventType") === "START", true)) over ws)
.withColumn("ends", count(when(col("eventType") === "END", true)) over ws)
.withColumn("count", col("starts") - col("ends"))
.drop("eventType", "starts", "ends")