У меня есть поток - я хочу сравнить количество событий в текущем окне с предыдущим окном.
Это можно сделать, сохранив количество событий в окне в globalState
и сделав что-то по ссылке:
class Foo [I,O] extends ProcessWindowFunction[I,O, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[I], out: Collector[O]): Unit = {
val state = context.globalState.getState(windowStateDescriptor)
if (state.value != null) {
if(state.value > elements.size) {
// do some out.collect
} else {
state.update(elements.size)
}
}
}
}
однако я стараюсь избегать сохранения постоянного состояния. есть ли лучший, более идиоматический способ добиться этого?