У меня есть набор данных Spark, проиндексированный по временной метке. Я хотел бы дополнить каждую запись дополнительной информацией: количество событий, произошедших за пять минут (300 секунд) после этого события. Поэтому, если исходные данные состоят из двух столбцов event_id
и timestamp
, я хочу построить третий столбец n counter
, как показано ниже:
event_id timestamp counter
0 0 4
1 100 3
2 150 2
3 250 1
4 275 0
5 600 2
6 610 1
7 750 1
8 950 2
9 1100 1
10 1200 0
Я знаю, что с помощью Spark я могу использовать windows для подсчитывать будущие события в пределах окна фиксированного размера по количеству событий .
val window = Window.orderBy('timestamp).rowsBetween(0, 300)
myDataset.withColumn("count_future_events", sum(lit(1)).over(window))
Но это не интересно, поскольку результат, очевидно, всегда одинаков.
Я sh что-то подобное существовало:
val window = Window.orderBy('timestamp).rowsBetween('timestamp, 'timestamp + 300) // 300 seconds here
Но это не компилируется.
Есть ли способ достичь того, чего я хочу?