У меня есть приложение Kafka Streams, для которого при каждом перезапуске смещения для темы, которую он потребляет, сбрасываются. Следовательно, для всех разделов лаги увеличиваются, и приложению необходимо повторно обработать все данные.
UPDATE:
Выходная тема получает серию событий, которые уже были обработаны после перезапуска приложения. Дело не в том, что смещения входной темы сбрасываются, как я уже говорил в предыдущем абзаце. Однако смещения внутренней темы ( KTABLE-SUPPRESS-STATE-STORE ) сбрасываются, см. Комментарии ниже.
Я гарантировал, что задержка равна 1 для каждого раздела перед перезапуском (это относится к теме вывода).
Все потребители, которые принадлежат этому идентификатору группы потребителей (app-id), являются активными.
Перезапуск происходит мгновенно, это занимает около 30 секунд.
Приложение использует только один раз в качестве гарантии обработки.
Я прочитал этот ответ Как истекает смещение для группы потребителей Apache Kafka? .
Я пробовал с auto.offset.reset = последний и auto.offset.reset = самый ранний .
Похоже, что смещения для этих тем не зафиксированы (но я не уверен в этом).
Я предполагаю, что после перезапуска приложение должно получить из последнего зафиксированного смещения для этой группы потребителей.
UPDATE:
Я предполагаю, что это для внутренней темы ( KTABLE-SUPPRESS-STATE-STORE )
Обеспечивает ли API-интерфейс Kafka Stream фиксацию всего потребленного смещения перед выключением? (после вызова streams.close () )
Буду очень признателен за любую подсказку по этому поводу.
UPDATE
Это код, который выполняет приложение:
final StreamsBuilder builder = new StreamsBuilder();
final KStream<..., ...> events = builder
.stream(inputTopicNames, Consumed.with(..., ...)
.withTimestampExtractor(...);
events
.filter((k, v) -> ...)
.flatMapValues(v -> ...)
.flatMapValues(v -> ...)
.selectKey((k, v) -> v)
.groupByKey(Grouped.with(..., ...))
.windowedBy(
TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
.advanceBy(Duration.ofSeconds(windowSizeInSecs))
.grace(Duration.ofSeconds(windowSizeGraceInSecs)))
.reduce((agg, new) -> {
...
return agg;
})
.suppress(Suppressed.untilWindowCloses(
Suppressed.BufferConfig.unbounded()))
.toStream()
.to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
Сброс смещения просто и всегда происходит (после перезапуска) с внутренней темой KTABLE-SUPPRESS-STATE-STORE , созданной API Kafka Stream.
Я пытался с гарантией обработки ровно один раз и хотя бы один раз .
Еще раз, я буду очень признателен за любую подсказку об этом.
UPDATE:
Это было решено в выпуске 2.2.1 (https://issues.apache.org/jira/browse/KAFKA-7895)