Распространяет ли хранилище состояний в Kafka Streams дублирующиеся записи в последующих темах? - PullRequest
1 голос
/ 13 января 2020

В настоящее время я использую Kafka Streams для приложения с сохранением состояния. Состояние не хранится в хранилище состояний Kafka, а скорее просто в памяти на данный момент. Это означает, что всякий раз, когда я перезапускаю приложение, все состояние теряется, и его нужно восстанавливать, обрабатывая все записи с самого начала.

После проведения некоторых исследований хранилищ состояний Kafka, кажется, это именно то решение, которое я ищу постоянное состояние между перезапусками приложения (в памяти или на диске). Однако я нахожу, что в ресурсах онлайн отсутствуют некоторые довольно важные детали, поэтому у меня все еще есть пара вопросов о том, как это будет работать точно:

  • Если поток настроен на запуск со смещения latest, будет ли состояние (повторно) рассчитываться из всех предыдущих записей?
  • Если ранее обработанные записи необходимо повторно обработать для перестройки состояния, будет ли это распространять записи через остальную топологию Streams (например, InputTopi c -> процессор с отслеживанием состояния -> OutputTopi c, приведет ли это к дублированию записей в OutputTopi c из-за состояния перестроения)?

Ответы [ 2 ]

1 голос
/ 14 января 2020

Если поток настроен на запуск со смещением latest, будет ли по-прежнему вычисляться (ре) состояние из всех предыдущих записей?

Если вы перезапускаете то же приложение (например, после того, как оно было остановлено ранее), тогда состояние не будет пересчитано путем повторной обработки исходных входных данных. Вместо этого состояние будет восстановлено из его «резервной копии» (каждое хранилище состояний или KTable постоянно хранится в топике Kafka c, так называемой «топологии изменений c» этой таблицы / хранилища состояний для таких целей) так что его данные в точности соответствуют тому, что было, когда приложение было остановлено. Такое поведение позволяет беспрепятственно останавливать и перезапускать ваши приложения, не пропуская записи, поступившие между «остановкой» и «перезапуском».

Но есть другое предостережение, о котором вам необходимо знать: конфигурация для установки начальная точка смещения (latest или earliest) используется только при первом запуске приложения Kafka Streams . После этого всякий раз, когда вы останавливаете + перезапускаете свое приложение, оно всегда будет продолжать работу с того места, где оно ранее было остановлено. Это связано с тем, что если приложение запускалось хотя бы один раз, оно сохраняло информацию о смещении потребителя в Kafka, что позволяет ему узнать, откуда автоматически возобновлять операции после перезапуска.

Если вам нужны другие всегда (пере), начиная, например, со смещений latest (таким образом, потенциально пропуская записи, поступившие между ними, когда вы остановили приложение и перезапустили его), вы должны сбросить ваше приложение Kafka Streams . Одним из шагов, который выполняет инструмент сброса, является удаление информации о смещении потребителя приложения из Kafka, что заставляет приложение думать, что оно никогда не запускалось, так сказать.

Если ранее уже обработанные записи необходимо будет повторно обработан для восстановления состояния, будет ли это распространять записи через остальную топологию Streams (например, InputTopi c -> процессор с сохранением состояния -> OutputTopi c, это приведет к дублированию записей в OutputTopi c, потому что состояния восстановления)?

Эта повторная обработка не будет выполняться по умолчанию, как описано выше. Состояние будет автоматически восстановлено до его прежнего состояния (предназначенного для каламбура) в тот момент, когда приложение было остановлено.

Повторная обработка будет происходить только в том случае, если вы вручную сбросите свое приложение (см. Выше) и, например, сконфигурируете приложение для повторной обработки. читать исторические данные (например, установить auto.offset.reset на earliest после того, как вы сделали сброс).

1 голос
/ 13 января 2020

Государственные хранилища используют свои собственные темы changelog, и государственные хранилища kafka-streams берут на себя ответственность за загрузку из них. Если ваши хранилища состояний не инициализированы, ваше приложение kafka-streams будет повторно гидрировать свое локальное хранилище состояний из списка изменений topi c, используя EARLIEST, поскольку оно должно читать каждую запись.

Это означает последовательность запуска для новый экземпляр примерно такой:

  • Обратите внимание, что нет локального кэша хранилища состояний
  • Загрузите локальное хранилище состояний, используя topi c журнала изменений для хранилища состояний ( topi c имя хранилища состояний <state-store-name>-changelog)
  • Читать каждую запись и соответственно обновлять локальный экземпляр rockDDB
  • Не излучать ничего, поскольку это служба приложения, а не ваша фактическая топология
  • Считайте смещения групп потребителей, используя EARLIEST или LATEST в соответствии с настройкой топологии. Это не проблема, если ваша группа потребителей еще не имеет смещений
  • Обрабатывает вещи, отправляя записи в соответствии с топологией

Независимо от того, установлена ​​ли у вас фактическая топология auto.offset.reset до LATEST или EARLIEST зависит от вас. Если они потеряны или вы создаете новую группу, это баланс между возможным пропуском записей (LATEST) и обработкой повторной обработки старых записей и дедупликацией (EARLIEST),

Короче говоря: восстановление состояния отличается от обработки и обрабатывается потоками kafka.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...