Я создаю потоковое приложение через Kafka Streams 2.10 и столкнулся с концептуальной проблемой.
The producer1 sends (Key -> Value): Session1 -> RUNNING
The producer2 sends (Key -> Value): Sessionabc -> RUNNING
The producer1 sends (Key -> Value): Session1 -> DONE
Теперь я хочу обнаружить мертвую сессию. Я пытаюсь использовать SessionWindow, но поскольку Кафка вычисляет запись по записи, я не могу вычислить все сразу.
Вот мой фрагмент:
builder
.stream("topic", Consumed.with(serdeKeySessionEvent, serdeValueSessionEvent))
.groupByKey(Grouped.with(serdeKeySessionEvent, serdeValueSessionEvent))
.windowedBy(SessionWindows.with(SESSION_DURATION))
.reduce(new SessionReducer())
.toStream((windowed, value) -> windowed.key())
.filter((k,v)-> Objects.nonNull(v) && v.getStatus() == Status.RUNNING)
.peek((a,b)->System.out.println("This Value is missing: \n "+a.toString()+b.toString()));`
Примечание: редуктор просто гарантирует, что когда мы видим DONE независимо от того, какой другой элемент у нас есть для того же сеанса, это всегда будет сделано.
Есть идеи?