Не тривиальная проблема, но вот решение с шагами следующим образом:
- Создание UDF для вычисления следующего ближайшего 30-минутного времени окончания события
event_ts_end
с использованием java.time
API
- Использовать оконную функцию
lag
для времени события из предыдущего ряда
- Используйте
when/otherwise
для генерации столбца event_ts_start
со значением null
, если разница во времени события из предыдущей строки составляет 30 минут
- Используйте оконную функцию
last(event_ts_start, ignoreNulls=true)
для обратной засыпки null
с последним event_ts_start
значением
- Группировать данные по
event_ts_start
для агрегирования event_duration
и event_ts_end
Сначала давайте соберем примерный набор данных:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val df = Seq(
(101, "2019-04-01 09:00", 800),
(101, "2019-04-01 09:30", 1800),
(101, "2019-04-01 10:00", 2700),
(101, "2019-04-01 12:00", 1000),
(101, "2019-04-01 13:00", 1000),
(220, "2019-04-02 10:00", 1500),
(220, "2019-04-02 10:30", 2400)
).toDF("event_id", "event_time", "event_duration")
Обратите внимание, что примерный набор данных был немного обобщен, чтобы включать более одного события, и сделать время события включающим date
информацию, чтобы охватить случаи, когда событие пересекает данную дату.
Шаг 1
:
import java.sql.Timestamp
def get_next_closest(seconds: Int) = udf{ (ts: Timestamp, duration: Int) =>
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
val iter = Iterator.iterate(ts.toLocalDateTime)(_.plusSeconds(seconds)).
dropWhile(_.isBefore(ts.toLocalDateTime.plusSeconds(duration)))
iter.next.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
}
Шаги 2 - 5
:
val winSpec = Window.partitionBy("event_id").orderBy("event_time")
val seconds = 30 * 60
df.
withColumn("event_ts", to_timestamp($"event_time", "yyyy-MM-dd HH:mm")).
withColumn("event_ts_end", get_next_closest(seconds)($"event_ts", $"event_duration")).
withColumn("prev_event_ts", lag($"event_ts", 1).over(winSpec)).
withColumn("event_ts_start", when($"prev_event_ts".isNull ||
unix_timestamp($"event_ts") - unix_timestamp($"prev_event_ts") =!= seconds, $"event_ts"
)).
withColumn("event_ts_start", last($"event_ts_start", ignoreNulls=true).over(winSpec)).
groupBy($"event_id", $"event_ts_start").agg(
sum($"event_duration").as("event_duration"), max($"event_ts_end").as("event_ts_end")
).show
// +--------+-------------------+--------------+-------------------+
// |event_id| event_ts_start|event_duration| event_ts_end|
// +--------+-------------------+--------------+-------------------+
// | 101|2019-04-01 09:00:00| 5300|2019-04-01 11:00:00|
// | 101|2019-04-01 12:00:00| 1000|2019-04-01 12:30:00|
// | 101|2019-04-01 13:00:00| 1000|2019-04-01 13:30:00|
// | 220|2019-04-02 10:00:00| 3900|2019-04-02 11:30:00|
// +--------+-------------------+--------------+-------------------+