Kafka Streams с государственными хранилищами - повторная обработка сообщений при перезапуске приложения - PullRequest
1 голос
/ 20 марта 2019

У нас есть следующая топология с двумя преобразователями, и каждый преобразователь использует постоянное хранилище состояний:

kStreamBuilder.stream(inboundTopicName)
    .transform(() -> new FirstTransformer(FIRST_STATE_STORE), FIRST_STATE_STORE)
    .map((key, value) -> ...)
    .transform(() -> new SecondTransformer(SECOND_STATE_STORE), SECOND_STATE_STORE)
    .to(outboundTopicName);

и в настройках Kafka auto.offset.reset: latest. После запуска приложения я вижу, как создаются (и ожидается) две внутренние сжатые темы: appId_inbound_firstStateStore-changelog и appId_inbound_secondStateStore-changelog

Наше приложение не работало в течение двух дней, и после того, как мы снова запустили приложение, сообщения были обработаны с самого начала для определенного раздела (но у нас есть несколько разделов). Я знаю, что зафиксированные смещения хранятся в течение ~ 1 дня для брокеров kafka до версии 2, поэтому наши смещения должны быть очищены путем сохранения. Но почему сообщения были обработаны с самого начала, если мы используем auto.offset.reset: latest? Возможно, это как-то связано с операциями с состоянием или внутренними темами журнала изменений.

Я вижу следующие журналы (большинство из них дублируются несколько раз):

StoreChangelogReader Restoring task 0_55's state store firstStateStore from beginning of the changelog
Fetcher [Consumer clientId=xxx-restore-consumer, groupId=] Resetting offset for partition xxx-55 to offset 0
ConsumerCoordinator Setting newly assigned partitions
ConsumerCoordinator Revoking previously assigned partitions
StreamsPartitionAssignor Assigned tasks to clients
AbstractCoordinator Successfully joined group with generation
StreamThread partition revocation took xxx ms
Unsubscribed all topics or patterns and assigned partitions
AbstractCoordinator (Re-)joining group
Attempt to heartbeat failed since group is rebalancing
AbstractCoordinator Group coordinator xxx:9092 (id: xxx rack: null) is unavailable or invalid, will attempt rediscovery
FetchSessionHandler - [Consumer clientId=xxx-restore-consumer, groupId=] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: org.apache.kafka.common.errors.DisconnectException

версия брокера Kafka 0.11.0.2; Версия Kafka Streams 2.1.0

...