Как выбрать Kafka Transactions.id - PullRequest
0 голосов
/ 14 мая 2018

Интересно, могу ли я получить некоторую помощь в понимании транзакций в Kafka и, в частности, как я использую транзакцию.Вот контекст:

  1. Приложение My Kafka следует шаблону: получает сообщение из входной темы, обрабатывает, публикует в выходную тему.
  2. Я использую не использую API Kafka Streams.
  3. У меня есть несколько потребителей в одной группе потребителей, и каждый потребитель находится в своем собственном потоке опроса.
  4. Существуетпул потоков с рабочими потоками, которые выполняют обработку и публикацию сообщений в выходной теме.На данный момент каждый поток имеет свой собственный экземпляр производителя.
  5. Я использую API опубликованных транзакций, чтобы гарантировать, что обновление смещения потребления и публикация в выходной теме происходят атомарно

Мои предположения на сегодняшний день включают в себя:

  1. Если мой процесс завершится с ошибкой в ​​середине транзакции, то ничего из этой транзакции не будет опубликовано, а смещение потребления не будет перемещено.Таким образом, при перезапуске я бы просто начал транзакцию снова с исходного смещения потребления.
  2. Для транзакции производителя все, что имело значение, было то, что он был уникальным.Поэтому я мог бы сгенерировать идентификатор на основе метки времени при запуске

Затем я прочитал следующий блог: https://www.confluent.io/blog/transactions-apache-kafka/. В частности, в разделе «Как выбрать идентификатор транзакции», кажется,подразумевается, что мне нужно гарантировать, что экземпляр производителя на входной раздел.В нем говорится: «Ключ к правильному ограждению зомби заключается в том, чтобы входные темы и разделы в цикле чтения-процесса-записи всегда были одинаковыми для данного транзакционного.id.».Далее он приводит пример проблемы следующим образом: «Например, в приложении обработки распределенного потока предположим, что тематический раздел tp0 был первоначально обработан транзакцией1.al T0. Если в какой-то момент позже он может быть сопоставлен другому производителю с транзакцией.id T1, между T0 и T1 не будет ограждения. Таким образом, возможно, что сообщения из tp0 могут быть обработаны повторно, нарушая гарантированно единовременную обработку. "

Я не совсем понимаю, почему этодело.На мой взгляд, мне не важно, какой производитель обрабатывает сообщения из любого раздела, если транзакции атомарны.Я боролся с этим в течение дня, и мне интересно, может кто-нибудь сказать мне, что я пропустил здесь.Итак, почему я не могу назначить работу любому экземпляру производителя с любым параметром транзакции.id, если он уникален.И почему они говорят, что сообщения могут просочиться через ограждение, предоставленное транзакциями, если вы сделаете это.

Ответы [ 3 ]

0 голосов
/ 23 марта 2019

При использовании Streams API (в отличие от обычных производителей Kafka) вам не нужно беспокоиться об установке уникального transactional.id для экземпляра вашего потокового приложения. Когда вы включаете семантику Streams exactly_once, API-интерфейс Streams будет генерировать правильный / уникальный транзакции.id на основе темы / раздела.

Проверьте, что именно здесь делается: https://github.com/axbaretto/kafka/blob/fe51708ade3cdf4fe9640c205c66e3dd1a110062/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L455

Задача (ссылается на TaskId в коде) объясняется здесь: https://docs.confluent.io/current/streams/architecture.html#stream-partitions-and-tasks

0 голосов
/ 04 июня 2019

Рассмотрим ситуацию, когда население группы потребителей постоянно меняется (новые потребители подключаются к сети или отключаются) или сценарий сбоя вызывает изменение баланса назначений тематических разделов в группе потребителей.

Теперь предположим, что потребителю C0 ранее был присвоен раздел P0. Этот потребитель с радостью отчитывается и обрабатывает сообщения, публикует новые и т. Д. (Стандартный шаблон потребление-преобразование-публикация.) Происходит событие перебалансировки, в результате которого P0 бесцеремонно (всегда хотел использовать это слово) отзывается из C0 и присвоено C1. С точки зрения C0, у него все еще может быть накопленный объем сообщений, и он не обращает внимания на переназначение. Вы попадаете в ситуацию, когда и C0, и C1 в течение очень короткого периода времени могут полагать, что они оба "владеют" P0 и будут действовать соответственно, создавая дублирующиеся сообщения в исходящей теме и, что еще хуже, имея эти дубликаты могут появляться не по порядку.

Использование transactional.id включает «ограждение», на которое ссылается оригинальный блог . В рамках переназначения новый производитель будет действовать под увеличенным номером эпохи, в то время как существующий все еще будет использовать старую эпоху. Фехтование тогда тривиально; отбрасывать сообщения, где эпоха истекла.

Есть несколько ошибок с транзакциями Kafka:

  • Входящие и исходящие темы должны находиться в одном кластере, чтобы транзакции работали.
  • Наименование transactional.id имеет решающее значение для «передачи» продюсером, даже если вы не заботитесь о фехтовании зомби. Появление нового производителя будет спровоцировать приведение в порядок любых осиротевших транзакций в полете для истекшего производителя, поэтому необходимо, чтобы идентификатор был стабильным / повторяемым в течение сеансов производителя. Не используйте случайные идентификаторы для этого; это не только приведет к незавершенным транзакциям (которые блокируют всех потребителей в режиме READ_COMMITTED), но также накапливает дополнительное состояние в Transactional Coordinator (работает в брокере). По умолчанию это состояние будет сохраняться в течение 7 дней, поэтому вы не хотите порождать произвольно названных транзакционных производителей по прихоти.
  • В идеале transactional.id отражает комбинацию как входящей темы , так и раздела. (Если, конечно, у вас нет темы с одним разделом.) На практике это означает создание нового транзакционного производителя для каждого раздела, назначенного для потребителя . (Помните, что в сценарии потребление-трансформация-публикация производитель также является потребителем, и назначения разделов потребителя будут меняться с каждым событием перебалансировки.) Взгляните на реализацию spring-kafka , которая лениво создает новый производитель для каждого входящего раздела. (Что-то нужно сказать о безопасности этого подхода и о том, следует ли очищать производителей при переназначении раздела, но это другой вопрос.)
  • Механизм ограждения работает только на уровне Кафки . Другими словами, это изолирует потерянного производителя от Кафки, но не от остального мира. Это означает, что если ваш производитель также должен обновить какое-то внешнее состояние (в базе данных, кэше и т. Д.) Как часть цикла потребление-преобразования-публикации, приложение должно отгородиться от базы данных при переназначении раздела. или иным образом обеспечьте идемпотентность обновления.

Просто для полноты, стоит отметить, что это не единственный способ добиться фехтования. API потребителя Kafka предоставляет пользователю возможность зарегистрировать ConsumerRebalanceListener, что дает перемещенному потребителю последний шанс стереть любое невыполненное невыполненное задание (или сбросить его) перед переназначением разделов новому потребителю. Обратный вызов блокирует; когда он возвращается, предполагается, что обработчик отключил себя локально; тогда и только тогда новый потребитель возобновит обработку.

0 голосов
/ 13 сентября 2018

В упомянутой вами статье блога есть вся информация, которую вы ищете, хотя она довольно плотная.

Из раздела Зачем нужны транзакции? в вышеупомянутой статье .

При использовании ванильных производителей и потребителей Kafka, настроенных как минимум на семантику доставки, приложение потоковой обработки может потерять ровно одну семантику обработки следующими способами:

  1. producer.send() может привести к дублированию записи сообщения B из-за внутренних попыток.Это решается идемпотентным производителем и не является предметом остальной части этого поста.

  2. Мы можем повторно обработать входное сообщение A, в результате чего дублированные сообщения B будут записаны на выход,нарушение ровно одной семантики обработки.Повторная обработка может произойти в случае сбоя приложения обработки потока после записи B, но до пометки A как использованной.Таким образом, когда он возобновляет работу, он снова потребляет A и снова пишет B, что приводит к дублированию.

  3. Наконец, в распределенных средах приложения аварийно завершают работу или, что еще хуже, временно теряютподключение к остальной части системы.Как правило, новые экземпляры автоматически запускаются, чтобы заменить те, которые считались потерянными.В ходе этого процесса у нас может быть несколько экземпляров, обрабатывающих одни и те же входные темы и пишущих в одни и те же выходные темы, что приводит к дублированию выходных данных и нарушению семантики обработки, выполняемой ровно один раз.Мы называем это проблемой «экземпляров зомби». [выделение добавлено]

Из раздела Транзакционная семантика в той же статьи.

Ограждение зомби

Мы решаем проблему экземпляров зомби, требуя, чтобы каждому транзакционному производителю был присвоен уникальный идентификатор, называемый transactional.id, Используется для идентификации одного и того же экземпляра производителя при перезапуске процесса. [выделение добавлено]

API требует, чтобы первой операцией транзакционного производителя была явная регистрация его transactional.id с помощьюкластер кафки.Когда это происходит, брокер Kafka проверяет открытые транзакции с указанным transactional.id и завершает их.Это также увеличивает эпоху, связанную с transactional.id.Эпоха - это внутренняя часть метаданных, сохраняемая для каждого transactional.id.

. После того, как эпоха столкнулась, любые производители с таким же transactional.id и более старым периодом считаются зомби и отгораживаются, т.е.,будущие записи транзакций от этих производителей отклоняются. [выделение добавлено]

И из раздела Поток данных в той же статье .

A: взаимодействие производителя и координатора транзакций

При выполнении транзакций производитель делает запросы координатору транзакций в следующих точках:

  1. API initTransactions регистрирует transactional.id с координатором.В этот момент координатор закрывает все ожидающие транзакции с этим transactional.id и поднимает эпоху, чтобы отгородить зомби.Это происходит только один раз за сеанс производителя. [выделение добавлено]

  2. Когда производитель собирается отправлять данные в раздел в первый раз в транзакции, раздел регистрируетсясначала с координатором.

  3. Когда приложение вызывает commitTransaction или abortTransaction, координатору отправляется запрос на запуск протокола двухфазной фиксации.

Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...