Ошибка потребителя Kafka: мертвый координатор маркировки - PullRequest
0 голосов
/ 15 мая 2018

У меня есть тема с 10 разделами в кластере Kafka 0.10.0.1. У меня есть приложение, которое порождает несколько потребительских потоков. По этой теме я порождаю 5 тем. Много раз в журналах приложений я вижу эту запись

INFO :: AbstractCoordinator:600 - Marking the coordinator x.x.x.x:9092
(id:2147483646 rack: null) dead for group notifications-consumer

Тогда есть несколько записей, говорящих (Re-)joining group notifications-consumer. После этого я также вижу одно предупреждение:

Auto commit failed for group notifications-consumer: Commit cannot be completed since
the group has already rebalanced and assigned the partitions to another member. This means
that the time between subsequent calls to poll() was longer than the configured
max.poll.interval.ms, which typically implies that the poll loop is spending too much time 
message processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned by poll() with max.poll.records.

Теперь я уже настроил свой потребительский конфиг так,

props.put("max.poll.records", 200);
props.put("heartbeat.interval.ms", 20000);
props.put("session.timeout.ms", 60000);

Итак, даже после правильной настройки конфига я все еще получаю эту ошибку. Во время перебалансировки наше приложение полностью не отвечает. Пожалуйста, помогите.

1 Ответ

0 голосов
/ 19 мая 2018

С session.timeout.ms вы контролируете только тайм-ауты из-за пульса, это означает, что прошло с момента последнего пульса session.timeout.ms миллисекунды, и кластер объявляет вас как мертвый узел и вызывает перебаланс.

До KIP-62 сердцебиение было отправлено в опросе, но теперь оно перемещено в определенный фоновый поток, чтобы избежать его выселения из кластера, если вы тратите больше времени, чем session.timeout.ms, на вызов другого poll().Разделение тактового импульса для определенного потока отделяет процесс обработки от сообщения кластеру о том, что вы работаете и работаете, но это создает риск возникновения «живых» блокировок, в которых процесс активен, но не прогрессирует, так что помимо того, что пульс становится независимымиз poll был введен новый тайм-аут, чтобы гарантировать, что потребитель жив и прогрессирует.В документации говорится о реализации до KIP-62:

Пока потребитель отправляет тактовые импульсы, он в основном удерживает блокировку назначенных ему разделов.Если процесс перестает функционировать таким образом, что он не может прогрессировать, но, тем не менее, продолжает посылать тактовые импульсы, то ни один другой участник в группе не сможет захватить разделы, что приводит к увеличению задержки.Однако тот факт, что сердцебиение и обработка выполняются в одном и том же потоке, гарантирует, что потребители должны добиться прогресса, чтобы сохранить свое назначение.Любая остановка, которая влияет на обработку, также влияет на сердцебиение.

Изменения, внесенные KIP-62, включают в себя:

Отключение тайм-аута обработки: мы предлагаем ввести отдельное локально принудительное выполнениетайм-аут для обработки записи и фоновый поток для сохранения активности сеанса до истечения этого тайм-аута.Мы называем этот новый тайм-аут как «тайм-аут процесса» и выставляем его в конфигурации потребителя как max.poll.interval.ms.Этот конфиг устанавливает максимальную задержку между клиентскими вызовами для poll ()

Из опубликованных вами журналов, я думаю, вы можете оказаться в такой ситуации, ваше приложение занимает больше времени, чем max.poll.interval.ms (5 мин.по умолчанию) для обработки 200 опрошенных записей.Если вы находитесь в этом сценарии, вы можете только еще больше уменьшить max.poll.records или увеличить max.poll.interval.ms.

PD:

Конфигурация max.poll.interval.ms, отображаемая в вашем журнале, относится к (по крайней мере) kafka 0.10.1.0, поэтому я предполагаю, что вы допустили небольшую ошибку.

Обновление

Поправьте меня, если я вас неправильно понял, но в своем последнем комментарии вы говорили, что создаете 25 потребителей (например, 25 org.apache.kafka.clients.consumer.KafkaConsumer, если вы использовали Java) и добавляете их вN разных тем, но с использованием одного и того же group.id.Если это правильно, вы будете видеть перебалансировку каждый раз, когда KafkaConsumer запускается или останавливается, потому что он отправит сообщение JoinGroup или LeaveGroup (см. Соответствующий протокол kafka ), который содержит group.idи member.id (member.id не является хостом, поэтому два потребителя, созданные в одном и том же процессе, будут по-прежнему иметь разные идентификаторы).Обратите внимание, что это сообщение не содержит информацию о подписке на темы (хотя эта информация должна быть у посредников, но kafka не использует ее для ребалансировки).Поэтому каждый раз, когда кластер получает JoinGroup или LeaveGroup для group.id X, он будет вызывать перебалансировку для всех потребителей с одинаковым group.id X.

Если вы запустите 25 потребителей сто же самое group.id вы будете видеть перебалансировку до тех пор, пока не будет создан последний потребитель, и соответствующая перебалансировка не прекратится (если вы продолжаете видеть это, вы, возможно, останавливаете потребителей).1054 *.

Если у нас есть два KafkaConsumer, использующие один и тот же group.id (работающий в одном и том же процессе или в двух разных процессах), и один из них закрыт, это вызывает перебалансировку в другом KafkaConsumer, даже если онибыли подписаны на разные темы.Я полагаю, что брокеры должны учитывать только group.id для перебалансировки, а не подписанные темы, соответствующие паре (group_id, member_id) LeaveGroupRequest, но мне интересно, является ли это ожидаемым поведением или это то, что должно быть улучшен? Я предполагаю, что это, вероятно, первый вариант, чтобы избежать более сложного перебалансирования в брокере, и учитывая, что решение очень простое, то есть просто используйте разные групповые идентификаторы для разных KafkaConsumer, которые подписываются на разные темы, даже если они работают в одном и том же процессе.


Когда происходит восстановление баланса, мы видим повторяющиеся сообщения

Это исключительное поведение, один потребитель потребляет сообщение, но перед фиксацией смещения запускается перебалансировка и происходит сбой при фиксации. Когда перебалансирование завершится, процесс, который будет иметь такое назначение темы, снова будет использовать сообщение (до успешной фиксации).

Я разделился на две группы, теперь проблема внезапно исчезла с последних 2 часов.

Здесь вы бьете по гвоздю, но если вы не хотите, чтобы какой-либо (можно избежать) ребалансировался, вы должны использовать разные group.id для каждой темы.

Вот отличный разговор о различных сценариях перебалансировки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...