Я использую потоки Кафки 2.2.1.
Я использую функцию подавления для удержания событий до закрытия окна. Я использую семантику времени события. Однако инициированные сообщения запускаются только после того, как в потоке появляется новое сообщение.
Следующий пример кода извлекается для выявления проблемы:
KStream<UUID, String>[] branches = is
.branch((key, msg) -> "a".equalsIgnoreCase(msg.split(",")[1]),
(key, msg) -> "b".equalsIgnoreCase(msg.split(",")[1]),
(key, value) -> true);
KStream<UUID, String> sideA = branches[0];
KStream<UUID, String> sideB = branches[1];
KStream<Windowed<UUID>, String> sideASuppressed =
sideA.groupByKey(
Grouped.with(new MyUUIDSerde(),
Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(31)).grace(Duration.ofMinutes(32)))
.reduce((v1, v2) -> {
return v1;
})
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream();
Сообщения передаются только от 'sideASuppressed'когда новое сообщение попадает в поток' sideA '(сообщения, поступающие в' sideB ', не будут вызывать подавления для вывода каких-либо сообщений, даже если время закрытия окна прошло давно). Хотя в производственной среде проблема, скорее всего, не возникает из-за большого объема, есть достаточно случаев, когда важно не ждать нового сообщения, которое попадает в поток 'sideA'.
Заранее спасибо.