У меня есть приложение Spring Cloud Stream (Kafka Streams версии 2.1) со связывателем Kafka Streams, и я выполняю агрегацию временного окна, где я хочу выполнить какое-либо действие (вызов API) только после закрытия окна. Поведение, которое я наблюдаю, заключается в том, что при каждом перезапуске приложения моя функция mapValues вызывается для каждой записи, хранящейся в журнале изменений, что приводит к огромному количеству обращений к API.
Мое понимание suppress ()заключается в том, что для каждого закрытого временного окна запись о надгробной плите должна отправляться в раздел сводного журнала изменений, что фактически не позволяет мне повторно обрабатывать ее даже после перезапуска приложения.
Что может быть причиной повторной обработки сообщений при перезапуске приложения?
Я уже подтвердил, что приложение не использует исходную тему.
Фрагмент соответствующейкод ниже:
Serde<Aggregator> aggregatorSerde = new JsonSerde<>(Aggregator.class, objectMapper);
Materialized<String, TriggerAggregator, WindowStore<Bytes, byte[]>> stateStore = Materialized.<String, Aggregator, WindowStore<Bytes, byte[]>>
with(Serdes.String(), aggregatorSerde);
KTable<Windowed<String>, List<Event>> windowedEventKTable = inputKStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(5))
.aggregate(Aggregator::new, ((key, value, aggregate) -> aggregate.aggregate(value)), stateStore)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()).withName(supressStoreName))
.mapValues((windowedKey, groupedTriggerAggregator) -> {//code here returning a list})
.toStream((k,v) -> k.key())
.flatMapValues((readOnlyKey, value) -> value);