Рассмотрим ситуацию, когда население группы потребителей постоянно меняется (новые потребители подключаются к сети или отключаются) или сценарий сбоя вызывает изменение баланса назначений тематических разделов в группе потребителей.
Теперь предположим, что потребителю C0
ранее был присвоен раздел P0
. Этот потребитель с радостью отчитывается и обрабатывает сообщения, публикует новые и т. Д. (Стандартный шаблон потребление-преобразование-публикация.) Происходит событие перебалансировки, в результате которого P0
бесцеремонно (всегда хотел использовать это слово) отзывается из C0
и присвоено C1
. С точки зрения C0
, у него все еще может быть накопленный объем сообщений, и он не обращает внимания на переназначение. Вы попадаете в ситуацию, когда и C0
, и C1
в течение очень короткого периода времени могут полагать, что они оба "владеют" P0
и будут действовать соответственно, создавая дублирующиеся сообщения в исходящей теме и, что еще хуже, имея эти дубликаты могут появляться не по порядку.
Использование transactional.id
включает «ограждение», на которое ссылается оригинальный блог . В рамках переназначения новый производитель будет действовать под увеличенным номером эпохи, в то время как существующий все еще будет использовать старую эпоху. Фехтование тогда тривиально; отбрасывать сообщения, где эпоха истекла.
Есть несколько ошибок с транзакциями Kafka:
- Входящие и исходящие темы должны находиться в одном кластере, чтобы транзакции работали.
- Наименование
transactional.id
имеет решающее значение для «передачи» продюсером, даже если вы не заботитесь о фехтовании зомби. Появление нового производителя будет спровоцировать приведение в порядок любых осиротевших транзакций в полете для истекшего производителя, поэтому необходимо, чтобы идентификатор был стабильным / повторяемым в течение сеансов производителя.
Не используйте случайные идентификаторы для этого; это не только приведет к незавершенным транзакциям (которые блокируют всех потребителей в режиме READ_COMMITTED
), но также накапливает дополнительное состояние в Transactional Coordinator (работает в брокере). По умолчанию это состояние будет сохраняться в течение 7 дней, поэтому вы не хотите порождать произвольно названных транзакционных производителей по прихоти.
- В идеале
transactional.id
отражает комбинацию как входящей темы , так и раздела. (Если, конечно, у вас нет темы с одним разделом.) На практике это означает создание нового транзакционного производителя для каждого раздела, назначенного для потребителя . (Помните, что в сценарии потребление-трансформация-публикация производитель также является потребителем, и назначения разделов потребителя будут меняться с каждым событием перебалансировки.) Взгляните на реализацию spring-kafka , которая лениво создает новый производитель для каждого входящего раздела. (Что-то нужно сказать о безопасности этого подхода и о том, следует ли очищать производителей при переназначении раздела, но это другой вопрос.)
- Механизм ограждения работает только на уровне Кафки . Другими словами, это изолирует потерянного производителя от Кафки, но не от остального мира. Это означает, что если ваш производитель также должен обновить какое-то внешнее состояние (в базе данных, кэше и т. Д.) Как часть цикла потребление-преобразования-публикации, приложение должно отгородиться от базы данных при переназначении раздела. или иным образом обеспечьте идемпотентность обновления.
Просто для полноты, стоит отметить, что это не единственный способ добиться фехтования. API потребителя Kafka предоставляет пользователю возможность зарегистрировать ConsumerRebalanceListener
, что дает перемещенному потребителю последний шанс стереть любое невыполненное невыполненное задание (или сбросить его) перед переназначением разделов новому потребителю. Обратный вызов блокирует; когда он возвращается, предполагается, что обработчик отключил себя локально; тогда и только тогда новый потребитель возобновит обработку.