С подходом, предоставленным Florian (оконная функция), можно сделать, взяв строки с измененными событиями, а затем взять следующую измененную дату в Scala:
val df = List(
("12-02-18T08:20:00", 1),
("12-02-18T08:25:00", 1),
("12-02-18T08:30:00", 1),
("12-02-18T09:00:00", 2),
("12-02-18T09:05:00", 2),
("12-02-18T09:10:00", 1),
("12-02-18T09:15:00", 1)
).toDF("datetime", "event")
df.show(false)
val w = Window.orderBy("datetime")
val changedRowsOnlyDF = df.withColumn("changed", $"event" =!= lag($"event", 1, 0).over(w))
.where($"changed")
val result = changedRowsOnlyDF
.withColumn("end_time", lead($"datetime", 1).over(w))
.drop("changed")
.withColumnRenamed("datetime", "start_time")
result.show(false)
Вывод:
+-----------------+-----+-----------------+
|start_time |event|end_time |
+-----------------+-----+-----------------+
|12-02-18T08:20:00|1 |12-02-18T09:00:00|
|12-02-18T09:00:00|2 |12-02-18T09:10:00|
|12-02-18T09:10:00|1 |null |
+-----------------+-----+-----------------+
Отказ от ответственности : такой подход может использоваться для небольших объемов данных, Spark уведомляется сообщением:
WARN org.apache.spark.sql.execution.window.WindowExec: Не определен раздел для работы с окном!Перемещение всех данных в один раздел может привести к серьезному снижению производительности.