Kafka Streams на Kubernetes: долгая ребалансировка после повторного развертывания - PullRequest
2 голосов
/ 08 мая 2020

Проблема

Мы используем StatefulSet для развертывания приложения Scala Kafka Streams в Kubernetes. Экземпляры имеют отдельные applicationId s, поэтому каждый из них полностью копирует входные topi c для обеспечения отказоустойчивости. По сути, это службы только для чтения, которые только читают в состоянии topi c и записывают его в хранилище состояний, откуда запросы клиентов обслуживаются через REST. Это означает, что группа потребителей всегда состоит только из одного экземпляра Kafka Streams в любой момент времени.

Наша проблема теперь в том, что при запуске скользящего перезапуска каждый экземпляр занимает около 5 минут. для запуска, где большую часть времени тратится на ожидание в состоянии REBALANCING. Я прочитал здесь , что Kafka Streams не отправляет запрос LeaveGroup, чтобы быстро вернуться после перезапуска контейнера без перебалансировки. Почему у нас это не работает и почему перебалансировка занимает так много времени, хотя applicationId идентичен? В идеале, чтобы минимизировать время простоя, приложение должно немедленно приступить к работе с того места, где оно оставалось после перезапуска.

Конфигурация

Вот некоторые конфигурации, которые мы изменили со значений по умолчанию:

properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000")
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), "300000")
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest")
// RocksDB config, see https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html
properties.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, classOf[BoundedMemoryRocksDBConfig])    

Вопросы / Конфиги по теме

  • Поможет ли уменьшение session.timeout.ms? Мы установили для него довольно большое значение, поскольку брокеры Kafka живут в другом центре обработки данных, а сетевые соединения иногда бывают не очень надежными.
  • Этот ответ предлагает уменьшить max.poll.interval.ms, поскольку это связано с таймаутом перебалансировки. Это правильно? Я не решаюсь менять это, так как это может иметь последствия для нормального режима работы нашего приложения.
  • Здесь упоминается конфигурации group.initial.rebalance.delay.ms для задержки перебалансировки во время развертывания - но это вызовет задержки и после восстановления после sh, не так ли?
  • Я также наткнулся на KIP-345 , который нацелен на устранение перебалансировки потребителей для состояния c членство полностью через group.instance.id, что было бы хорошо для нашего случая пользователя, но, похоже, оно еще не доступно для наших брокеров.

Меня смущает множество конфигураций и как их использовать для быстрого восстановления после обновления. Может кто-нибудь объяснить, как они вместе играют?

1 Ответ

2 голосов
/ 11 мая 2020

Другой вопрос, который вы цитируете, не говорит о том, что ребалансировки можно избежать при перезапуске. Отсутствие отправки LeaveGroupRequest позволяет избежать перебалансировки только после остановки приложения. Следовательно, количество перебалансировок сокращается с двух до одного. Конечно, с вашим несколько необычным развертыванием с одним экземпляром вы ничего здесь не получите (на самом деле, это может «навредить» вам ...) .timeout.ms? Мы установили для него довольно большое значение, так как брокеры Kafka находятся в другом центре обработки данных, а сетевые соединения иногда не очень надежны.

Может быть, в зависимости от того, как быстро вы перезапустите приложение. (Более подробная информация ниже.) Может быть, просто попробуйте (ie, установите его на 3 минуты, чтобы по-прежнему было высокое значение для стабильности, и посмотрите, как время перебалансировки упадет до 3 минут?

Это В ответе предлагается уменьшить max.poll.interval.ms, так как это связано с таймаутом перебалансировки. Верно? Я не решаюсь изменить это, так как это может иметь последствия для нормального режима работы нашего приложения.

max.poll.interval.ms также влияет на время перебалансировки (подробности ниже). Однако значение по умолчанию составляет 30 секунд, поэтому время перебалансировки не должно составлять 5 минут.

Есть упоминание конфигурации group.initial.rebalance.delay.ms, чтобы отложить перебалансировку во время развертывания - но это вызовет задержки также после восстановления из cra sh, не так ли?

Это только применяется к пустым группам потребителей, а значение по умолчанию составляет всего 3 секунды. Так что это не должно повлиять на вас.

Я также наткнулся на KIP-345, цель которого - устранить перебалансировку потребителей для st ati c членство полностью через group.instance.id, что было бы хорошо для нашего случая пользователя, но, похоже, оно еще не доступно для наших брокеров.

Использование stati c членство в группе может быть лучшим выбором. Возможно, стоит обновить ваших брокеров, чтобы получить эту функцию.

Кстати, разница между session.timeout.ms и max.poll.interval.ms объясняется в другом вопросе: Разница между session.timeout.ms и max.poll .interval.ms для Kafka 0.10.0.0 и более поздних версий

Как правило, координатор группы на стороне брокера поддерживает список всех участников для каждого «поколения группы». Перебалансировка запускается, если участник активно покидает группу (отправляя LeaveGroupRequest), истекает время ожидания (через session.timeout.ms или max.poll.interval.ms) или новый участник присоединяется к группе. Если происходит перебалансировка, каждый участник получает шанс снова присоединиться к группе, чтобы быть включенным в следующее поколение.

В вашем случае в группе только один участник. Когда вы останавливаете свое приложение, LeaveGroupRequest не отправляется, и, таким образом, координатор группы удалит этого участника только после того, как session.timeout.ms пройден.

Если вы перезапустите приложение, он вернется как «новый» участник (с точки зрения координатора группы). Это вызовет отказ, давая возможность всем членам группы снова присоединиться к группе. В вашем случае «старый» экземпляр может все еще находиться в группе, и поэтому перебалансировка будет продвигаться вперед только после того, как координатор группы удалил старого участника из группы. Проблема может заключаться в том, что координатор группы думает, что группа расширяется с одного до двух членов ... (Это то, что я имел в виду выше: если будет отправлено LeaveGroupRequest, группа станет пустой, когда вы остановите вас. app, и при перезапуске в группе будет только новый член, и перебалансировка сразу же продвинется вперед.) идентифицируется как "старый" экземпляр, и координатору группы не нужно ждать истечения срока действия старого члена группы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...