Думаю, мне не следует соглашаться с Гиоргосом, хотя, возможно, я просто неправильно понимаю его точку зрения.
Производители и потребители только когда-либо общаются с лидером раздела - другие реплики просто находятся в режиме ожидания на случай, если лидер выходит из строя, и они действуют как потребители для лидера, чтобы поддерживать свои данные в актуальном состоянии.
Однако при запуске приложения клиентский код может подключиться к любому из посредников и узнает лидеров разделов при получении метаданных. Эти метаданные затем кешируются на стороне клиента, но если на стороне посредника происходит смена лидера, то есть когда вы увидите исключение NotLeaderForPartitionException, и это заставит клиентский код снова извлечь метаданные, чтобы получить текущий набор лидеров раздела. , Выбор лидера требует времени, и поэтому в этом процессе будет некоторая задержка, но это признак того, что репликация и устойчивость на уровне брокера работают правильно.
На стороне потребителя ручное принятие или автоматическое принятие сделают нет разницы, если вы используете стандартную фиксацию смещения topi c - автокоммит просто означает, что каждый раз при опросе будут обрабатываться предыдущие обработанные сообщения (фактически, возможно, не КАЖДЫЙ опрос), но это, скорее всего, то же самое, что вы делали бы вручную. Хранение смещений в базе данных поможет сохранить все транзакции, если обработка сообщения означает обновление данных в этой базе данных - в этом случае вы можете зафиксировать смещения и обработанные данные в одной и той же транзакции БД.
По сути, я вполне вы наверняка понимаете, что дубликаты - это неизбежная часть масштабируемости потребителя, поскольку она позволяет любому процессу потребителя выбрать раздел и go от последнего зафиксированного смещения. Дубликаты возникают, когда потребитель обработал часть пакета, а затем считается, что он находится в автономном режиме, либо потому, что процесс фактически завершился, либо из-за того, что обработка пакета заняла слишком много времени. Чтобы избежать дублирования, вы должны убедиться, что каждое обработанное сообщение связано с фиксацией в одной и той же транзакции. Обычно это стоимость пропускной способности, но поскольку вы предлагаете ручную фиксацию каждого сообщения, а не на уровне пакета, а сохранение смещений в одной и той же транзакции БД может предотвратить дублирование потребления.
По вопросу, почему перебалансировка На это есть только две причины - изменение количества разделов в топи c или предполагаемое изменение в составе группы потребителей. Для этого есть две возможные причины - остановка потока пульса, что обычно означает остановку приложения-потребителя или обработку пакета, превышающую max.poll.interval.ms (эта конфигурация предназначена для остановки livelock, когда потребитель активен и отправляет тактовые импульсы, но прекратил опрос). Это последняя нормальная причина перебалансировок за пределами перезапусков приложений - неизбежно иногда возникает некоторая задержка где-то в любой системе, и поэтому перебалансировки потребителей обычно считаются нормальными, если они не происходят слишком часто из-за небольшой задержки при обработке пакета.
Я не уверен в проблемах со стороны производителя - в моем случае я обрабатываю дубликаты у потребителя, а в производителе я просто разрешаю большое количество повторных попыток , с acks = all (необходимо, если вы не можете позволить себе потерять сообщения) и 1 максимальный запрос в полете (для обеспечения заказа). Связаны ли тайм-ауты производителя с NotLeaderForPartitionException? Это просто из-за лидерских выборов?
(есть еще некоторые подробности в https://chrisg23.blogspot.com/2020/02/kafka-acks-configuration-apology.html - немного бессвязное сообщение в блоге, но может быть интересным)