Почему смещения группы потребителей (app-id) моего приложения Kafka Streams сбрасываются после перезапуска приложения? - PullRequest
0 голосов
/ 11 января 2019

У меня есть приложение Kafka Streams, для которого при каждом перезапуске смещения для темы, которую он потребляет, сбрасываются. Следовательно, для всех разделов лаги увеличиваются, и приложению необходимо повторно обработать все данные.

UPDATE: Выходная тема получает серию событий, которые уже были обработаны после перезапуска приложения. Дело не в том, что смещения входной темы сбрасываются, как я уже говорил в предыдущем абзаце. Однако смещения внутренней темы ( KTABLE-SUPPRESS-STATE-STORE ) сбрасываются, см. Комментарии ниже.

Я гарантировал, что задержка равна 1 для каждого раздела перед перезапуском (это относится к теме вывода). Все потребители, которые принадлежат этому идентификатору группы потребителей (app-id), являются активными. Перезапуск происходит мгновенно, это занимает около 30 секунд.

Приложение использует только один раз в качестве гарантии обработки.

Я прочитал этот ответ Как истекает смещение для группы потребителей Apache Kafka? .

Я пробовал с auto.offset.reset = последний и auto.offset.reset = самый ранний .

Похоже, что смещения для этих тем не зафиксированы (но я не уверен в этом).

Я предполагаю, что после перезапуска приложение должно получить из последнего зафиксированного смещения для этой группы потребителей.

UPDATE: Я предполагаю, что это для внутренней темы ( KTABLE-SUPPRESS-STATE-STORE )

Обеспечивает ли API-интерфейс Kafka Stream фиксацию всего потребленного смещения перед выключением? (после вызова streams.close () )

Буду очень признателен за любую подсказку по этому поводу.

UPDATE

Это код, который выполняет приложение:

final StreamsBuilder builder = new StreamsBuilder();
final KStream<..., ...> events = builder
        .stream(inputTopicNames, Consumed.with(..., ...)
        .withTimestampExtractor(...);

events
    .filter((k, v) -> ...)
    .flatMapValues(v -> ...)
    .flatMapValues(v -> ...)
    .selectKey((k, v) -> v)
    .groupByKey(Grouped.with(..., ...))
    .windowedBy(
        TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))              
            .advanceBy(Duration.ofSeconds(windowSizeInSecs))
            .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
    .reduce((agg, new) -> {
        ...
        return agg;
    })
    .suppress(Suppressed.untilWindowCloses(
                  Suppressed.BufferConfig.unbounded()))
    .toStream()
    .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));

Сброс смещения просто и всегда происходит (после перезапуска) с внутренней темой KTABLE-SUPPRESS-STATE-STORE , созданной API Kafka Stream.

Я пытался с гарантией обработки ровно один раз и хотя бы один раз .

Еще раз, я буду очень признателен за любую подсказку об этом.

UPDATE: Это было решено в выпуске 2.2.1 (https://issues.apache.org/jira/browse/KAFKA-7895)

Ответы [ 3 ]

0 голосов
/ 17 января 2019

Сброс смещения просто и всегда происходит (после перезапуска) с внутренней темой KTABLE-SUPPRESS-STATE-STORE, созданной API-интерфейсом Kafka Stream.

Это ожидаемое поведение (версия 2.1), поскольку оператор suppress() работает только в оперативной памяти. Таким образом, при перезапуске буфер подавления должен быть воссоздан из раздела изменений, прежде чем начнется обработка.

Обратите внимание, что в следующих выпусках планируется разрешить suppress() записи на диск (см. https://issues.apache.org/jira/browse/KAFKA-7224).) Это позволит избежать накладных расходов при воссоздании буфера из раздела изменений.

0 голосов
/ 25 января 2019

Я думаю, что ответ @Matthias J. Sax охватывает большинство внутренних элементов подавления. Я должен уточнить одну вещь: когда вы говорите «перезапустите приложение», что именно вы делали? Вы корректно завершили работу всего приложения, а затем перезапустили его?

0 голосов
/ 11 января 2019

Частота фиксации контролируется параметром commit.interval.ms. Проверьте, действительно ли ваши смещения зафиксированы. По умолчанию смещения фиксируются каждые 100 мс или 30 секунд, в зависимости от конфигурации гарантии обработки. Проверьте это из

...