С session.timeout.ms
вы контролируете только тайм-ауты из-за пульса, это означает, что прошло с момента последнего пульса session.timeout.ms
миллисекунды, и кластер объявляет вас как мертвый узел и вызывает перебаланс.
До KIP-62 сердцебиение было отправлено в опросе, но теперь оно перемещено в определенный фоновый поток, чтобы избежать его выселения из кластера, если вы тратите больше времени, чем session.timeout.ms
, на вызов другого poll()
.Разделение тактового импульса для определенного потока отделяет процесс обработки от сообщения кластеру о том, что вы работаете и работаете, но это создает риск возникновения «живых» блокировок, в которых процесс активен, но не прогрессирует, так что помимо того, что пульс становится независимымиз poll
был введен новый тайм-аут, чтобы гарантировать, что потребитель жив и прогрессирует.В документации говорится о реализации до KIP-62:
Пока потребитель отправляет тактовые импульсы, он в основном удерживает блокировку назначенных ему разделов.Если процесс перестает функционировать таким образом, что он не может прогрессировать, но, тем не менее, продолжает посылать тактовые импульсы, то ни один другой участник в группе не сможет захватить разделы, что приводит к увеличению задержки.Однако тот факт, что сердцебиение и обработка выполняются в одном и том же потоке, гарантирует, что потребители должны добиться прогресса, чтобы сохранить свое назначение.Любая остановка, которая влияет на обработку, также влияет на сердцебиение.
Изменения, внесенные KIP-62, включают в себя:
Отключение тайм-аута обработки: мы предлагаем ввести отдельное локально принудительное выполнениетайм-аут для обработки записи и фоновый поток для сохранения активности сеанса до истечения этого тайм-аута.Мы называем этот новый тайм-аут как «тайм-аут процесса» и выставляем его в конфигурации потребителя как max.poll.interval.ms.Этот конфиг устанавливает максимальную задержку между клиентскими вызовами для poll ()
Из опубликованных вами журналов, я думаю, вы можете оказаться в такой ситуации, ваше приложение занимает больше времени, чем max.poll.interval.ms
(5 мин.по умолчанию) для обработки 200 опрошенных записей.Если вы находитесь в этом сценарии, вы можете только еще больше уменьшить max.poll.records
или увеличить max.poll.interval.ms
.
PD:
Конфигурация max.poll.interval.ms
, отображаемая в вашем журнале, относится к (по крайней мере) kafka 0.10.1.0, поэтому я предполагаю, что вы допустили небольшую ошибку.
Обновление
Поправьте меня, если я вас неправильно понял, но в своем последнем комментарии вы говорили, что создаете 25 потребителей (например, 25 org.apache.kafka.clients.consumer.KafkaConsumer
, если вы использовали Java) и добавляете их вN разных тем, но с использованием одного и того же group.id
.Если это правильно, вы будете видеть перебалансировку каждый раз, когда KafkaConsumer
запускается или останавливается, потому что он отправит сообщение JoinGroup
или LeaveGroup
(см. Соответствующий протокол kafka ), который содержит group.id
и member.id
(member.id
не является хостом, поэтому два потребителя, созданные в одном и том же процессе, будут по-прежнему иметь разные идентификаторы).Обратите внимание, что это сообщение не содержит информацию о подписке на темы (хотя эта информация должна быть у посредников, но kafka не использует ее для ребалансировки).Поэтому каждый раз, когда кластер получает JoinGroup
или LeaveGroup
для group.id
X, он будет вызывать перебалансировку для всех потребителей с одинаковым group.id
X.
Если вы запустите 25 потребителей сто же самое group.id
вы будете видеть перебалансировку до тех пор, пока не будет создан последний потребитель, и соответствующая перебалансировка не прекратится (если вы продолжаете видеть это, вы, возможно, останавливаете потребителей).1054 *.
Если у нас есть два KafkaConsumer, использующие один и тот же group.id (работающий в одном и том же процессе или в двух разных процессах), и один из них закрыт, это вызывает перебалансировку в другом KafkaConsumer, даже если онибыли подписаны на разные темы.Я полагаю, что брокеры должны учитывать только group.id для перебалансировки, а не подписанные темы, соответствующие паре (group_id, member_id) LeaveGroupRequest, но мне интересно, является ли это ожидаемым поведением или это то, что должно быть улучшен?
Я предполагаю, что это, вероятно, первый вариант, чтобы избежать более сложного перебалансирования в брокере, и учитывая, что решение очень простое, то есть просто используйте разные групповые идентификаторы для разных KafkaConsumer, которые подписываются на разные темы, даже если они работают в одном и том же процессе.
Когда происходит восстановление баланса, мы видим повторяющиеся сообщения
Это исключительное поведение, один потребитель потребляет сообщение, но перед фиксацией смещения запускается перебалансировка и происходит сбой при фиксации. Когда перебалансирование завершится, процесс, который будет иметь такое назначение темы, снова будет использовать сообщение (до успешной фиксации).
Я разделился на две группы, теперь проблема внезапно исчезла с последних 2 часов.
Здесь вы бьете по гвоздю, но если вы не хотите, чтобы какой-либо (можно избежать) ребалансировался, вы должны использовать разные group.id
для каждой темы.
Вот отличный разговор о различных сценариях перебалансировки.