Потоки Кафки с suppress (), перерабатывающими changelog - PullRequest
0 голосов
/ 26 октября 2019

У меня есть приложение Spring Cloud Stream (Kafka Streams версии 2.1) со связывателем Kafka Streams, и я выполняю агрегацию временного окна, где я хочу выполнить какое-либо действие (вызов API) только после закрытия окна. Поведение, которое я наблюдаю, заключается в том, что при каждом перезапуске приложения моя функция mapValues ​​вызывается для каждой записи, хранящейся в журнале изменений, что приводит к огромному количеству обращений к API.

Мое понимание suppress ()заключается в том, что для каждого закрытого временного окна запись о надгробной плите должна отправляться в раздел сводного журнала изменений, что фактически не позволяет мне повторно обрабатывать ее даже после перезапуска приложения.

Что может быть причиной повторной обработки сообщений при перезапуске приложения?

Я уже подтвердил, что приложение не использует исходную тему.

Фрагмент соответствующейкод ниже:

    Serde<Aggregator> aggregatorSerde = new JsonSerde<>(Aggregator.class, objectMapper);

    Materialized<String, TriggerAggregator, WindowStore<Bytes, byte[]>> stateStore = Materialized.<String, Aggregator, WindowStore<Bytes, byte[]>>
        with(Serdes.String(), aggregatorSerde);

    KTable<Windowed<String>, List<Event>> windowedEventKTable = inputKStream
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(5))
        .aggregate(Aggregator::new, ((key, value, aggregate) -> aggregate.aggregate(value)), stateStore)
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()).withName(supressStoreName))
        .mapValues((windowedKey, groupedTriggerAggregator) -> {//code here returning a list})
        .toStream((k,v) -> k.key())
        .flatMapValues((readOnlyKey, value) -> value);
...