Справочная информация: У меня есть приложение, работающее в Kubernetes, которое использует Kafka в качестве централизованной шины сообщений. Клиенты Kafka в моем приложении могут быть довольно медленными. Максимальное время, в течение которого брокер Kafka будет ожидать между последующими вызовами poll (), прежде чем выкинуть члена из группы и перебалансировать, контролируется max.poll.interval.ms
.
. Для большинства работников этого приложения я могу установите max.poll.interval.ms
на порядок порядка нескольких минут. Однако для работников младших классов мне нужно установить это значение на несколько часов.
Когда все работает нормально, это не вызывает проблем. Однако, в случае нарушения работы сети или периодических сбоев, я заметил, что работники с очень большими максимальными интервалами опроса могут «застрять» в восстановлении баланса. Если я посмотрю на брокера, когда это произойдет, и выполню что-то вроде
kafka-consumer-groups --bootstrap-server localhost:9092 --group my-group --describe --members
, то увижу, что брокер ждет кучу рабочих, которых просто больше не существует (я уверен, что это Дело в том, что я установил group.instance.id
для имени хоста pod Kubernetes, чтобы я мог убедиться, что застрявшие члены группы действительно исчезли.
Через этот вопрос , я вижу, что В KIP-266 говорится, что « API JoinGroup будет рассматриваться как особый случай, и его таймаут будет установлен на значение, полученное из max.poll.interval.ms. » То, что, как мне кажется, происходит, в том, что мои работники отправляют JoinGroup непосредственно перед тем, как каким-то образом отключиться от брокера, и это заставляет брокера ждать полные max.poll.interval.ms
, прежде чем пометить их как мертвых, и позволяет новым работникам быть перебалансированными.
Когда это происходит, мне кажется, что я должен убить брокеров Kafka и вернуть их обратно, чтобы убрать мертвых членов ... или же все процессы зависают в течение нескольких часов, пока брокер ожидает полный тайм-аут. Оба эти решения плохие, и я тоже не доволен.
Мой вопрос: Есть ли настройка, которую я могу настроить, чтобы заставить Кафку ждать меньше max.poll.interval.ms
, прежде чем дать на запрос JoinGroup? Если это означает, что после перебоя в сети наблюдается небольшой отток в балансировке, так как очень медленные потребители поздно возвращаются в группу, тогда я в порядке. Если такого механизма нет, как мне провести рефакторинг моей системы, чтобы избежать проблем, с которыми я сталкиваюсь?
Я использую Confluent Kafka confluentinc/cp-kafka:5.4.1
, который выглядит как Kafka 2.4.0.