У меня есть поток, содержащий события Event(Id, Type, Date)
, и я хочу обработать эти события, сгруппированные по (Id, Type) и сеансу активности
Например, из событий
Event1: Event(1, T1, 2018-01-24)
Event2: Event(2, T1, 2018-01-26)
Event3: Event(1, T2, 2018-01-28)
Event4: Event(1, T2, 2018-01-28)
...
Я ожидаю, что у меня будут следующие окна:
Window1 with Event1
Window2 with Event2
Window3 with Event3 and Event4
...
Насколько я понимаю, я смогу сделать это с окнами сеанса времени-события в потоке с ключами.Но с моим кодом печатается только первое окно (Window1), содержащее первое событие (Event1).
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
environment
.addSource(kafkaConsumer.setStartFromEarliest())
.assignTimestampsAndWatermarks(<timestamp assigner>)
.keyBy(e => (e.getId, e.getType))
.window(EventTimeSessionWindows.withGap(Time.days(1)))
.apply(new WindowFunction[Event, String, (String, String), TimeWindow]() {
override def apply(key: (String, String), window: TimeWindow, input: Iterable[Event], out: Collector[String]): Unit = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window $window count: $count")
}
})
.print()
Это подходящий способ работы с историческими событиями и окнами сеанса?