Что будет делать эта функция (ProcessElement), довольно ясно: на основе ключевого потока (с ключом rideId) она будет перебирать все элементы, чей rideId принадлежит этому ключу, и обновит состояние на основе условия
override def processElement(ride: TaxiRide,
context: KeyedProcessFunction[Long, TaxiRide, TaxiRide]#Context,
out: Collector[TaxiRide]): Unit = {
val timerService = context.timerService
if (ride.isStart) {
// the matching END might have arrived first; don't overwrite it
if (rideState.value() == null) {
rideState.update(ride)
}
}
else {
rideState.update(ride)
}
timerService.registerEventTimeTimer(ride.getEventTime + 120 * 60 * 1000)
}
Таймер сработает, когда водяной знак достигнет отметки времени
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[Long, TaxiRide, TaxiRide]#OnTimerContext,
out: Collector[TaxiRide]): Unit = {
val savedRide = rideState.value
if (savedRide != null && savedRide.isStart) {
out.collect(savedRide)
}
rideState.clear()
}
Проблема: если сначала идет запись End, а затем на основе logi c, она будет не обновлять состояние поездки (связанный ключ), тогда он сработает через 2 часа, тогда он не будет собирать и не генерировать запись, но что, если эта запись соответствует нашим требованиям? ==> время начала записи произошло более 2 часов a go? Я думаю, что должно быть больше логи c, чтобы справиться с этим