Мне нужно отправить предупреждение, если в теме не было получено ни одного события в течение определенного периода времени. Как лучше всего решить этот вариант использования с KafkaStream?
Я пытался:
1) windowedBy вместе с оператором подавления :
stream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1000)).grace(Duration.ZERO))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()))
.filter((k, v) -> v == 0)
.toStream()
.map((windowId, count) -> KeyValue.pair(windowId.key(), AlarmEvent.builder().build()))
.to(ALARMS, Produced.with(Serdes.String(), AlarmEvent.serde()));
Но, похоже, что окно не закроется, пока не произойдет событие после истечения срока действия, поэтому ни один сигнал тревоги не может быть отправлен точно после определенного времени ожидания.
2) Используя процессор API с пунктатором , похоже, что он работает, но я тестировал только с TopologyTestDriver и advanceWallClockTime () . Не уверен, что advanceWallClockTime () отражает аванс в реальном времени или изменится только при получении события, что приведет к возврату к проблеме в 1).
3) Если пунктуатор работает, я хотел бы использовать его в ValueTranformer , чтобы извлечь выгоду из топологии DSL. Однако я сталкиваюсь с проблемой, описанной в Как переслать событие вниз по течению от экземпляра Punctuator в ValueTransformer? . Невозможно отправить событие ниже по течению от экземпляра пунктуатора.
4) Наконец, у меня возникла идея регулярно вводить фиктивные события (например, каждую секунду) для каждого раздела, чтобы искусственно заставить внутренние часы двигаться вперед. Это позволило бы мне использовать чистое и простое окно DSL и подавлять операторы.