Можно ли попросить Кафку подождать меньше чем max.poll.interval.ms во время JoinGroup? - PullRequest
0 голосов
/ 15 апреля 2020

Справочная информация: У меня есть приложение, работающее в Kubernetes, которое использует Kafka в качестве централизованной шины сообщений. Клиенты Kafka в моем приложении могут быть довольно медленными. Максимальное время, в течение которого брокер Kafka будет ожидать между последующими вызовами poll (), прежде чем выкинуть члена из группы и перебалансировать, контролируется max.poll.interval.ms.

. Для большинства работников этого приложения я могу установите max.poll.interval.ms на порядок порядка нескольких минут. Однако для работников младших классов мне нужно установить это значение на несколько часов.

Когда все работает нормально, это не вызывает проблем. Однако, в случае нарушения работы сети или периодических сбоев, я заметил, что работники с очень большими максимальными интервалами опроса могут «застрять» в восстановлении баланса. Если я посмотрю на брокера, когда это произойдет, и выполню что-то вроде

kafka-consumer-groups --bootstrap-server localhost:9092 --group my-group --describe --members

, то увижу, что брокер ждет кучу рабочих, которых просто больше не существует (я уверен, что это Дело в том, что я установил group.instance.id для имени хоста pod Kubernetes, чтобы я мог убедиться, что застрявшие члены группы действительно исчезли.

Через этот вопрос , я вижу, что В KIP-266 говорится, что « API JoinGroup будет рассматриваться как особый случай, и его таймаут будет установлен на значение, полученное из max.poll.interval.ms. » То, что, как мне кажется, происходит, в том, что мои работники отправляют JoinGroup непосредственно перед тем, как каким-то образом отключиться от брокера, и это заставляет брокера ждать полные max.poll.interval.ms, прежде чем пометить их как мертвых, и позволяет новым работникам быть перебалансированными.

Когда это происходит, мне кажется, что я должен убить брокеров Kafka и вернуть их обратно, чтобы убрать мертвых членов ... или же все процессы зависают в течение нескольких часов, пока брокер ожидает полный тайм-аут. Оба эти решения плохие, и я тоже не доволен.

Мой вопрос: Есть ли настройка, которую я могу настроить, чтобы заставить Кафку ждать меньше max.poll.interval.ms, прежде чем дать на запрос JoinGroup? Если это означает, что после перебоя в сети наблюдается небольшой отток в балансировке, так как очень медленные потребители поздно возвращаются в группу, тогда я в порядке. Если такого механизма нет, как мне провести рефакторинг моей системы, чтобы избежать проблем, с которыми я сталкиваюсь?

Я использую Confluent Kafka confluentinc/cp-kafka:5.4.1, который выглядит как Kafka 2.4.0.

Ответы [ 3 ]

2 голосов
/ 15 апреля 2020

В Kafka, когда начинается перебалансировка в группе потребителей, все потребители в этой группе потребителей отменяются, и Kafka ждет всех живых потребителей (потребителей, отправляющих пульс) в poll () (вызов опроса для отмененного потребителя означает JoinGroupRequest). Важно то, что:

rebalance timeout = max.poll.interval.ms

, и это нельзя изменить . На самом деле это разумно, потому что Кафка ждет живых потребителей, чтобы завершить свою работу и вновь присоединиться к группе. Таким образом, перебалансировка завершается, когда все активные потребители отправляют joinGroupRequests или происходит тайм-аут перебалансировки.

Во время перебалансировки, поскольку все потребители в группе потребителей отменены, операция потребления для этой группы потребителей прекращается. Поэтому в качестве хорошей практики следует избегать длительных процессов.

В результате:

длительный запуск процессов -> длительный отсчет времени max.poll.interval.ms -> длительное время восстановления баланса

0 голосов
/ 29 апреля 2020

Моим окончательным решением было перейти на Apache Pulsar.

Pulsar позволяет индивидуально проверять сообщения, что решает проблему.

0 голосов
/ 16 апреля 2020

Я не решил это (и кажется, что решения может и не быть), но я мог бы найти способ немного улучшить ситуацию: установить group.instance.id на имя хоста Kubernetes, а также использовать StatefulSets в Kubernetes, так что имя хоста для конкретного работника стабильно. Таким образом, когда работник падает и возвращается, мы надеемся, что Кафка сможет распознать его как того же работника, вместо того, чтобы торчать в ожидании призрака.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...