Я встречаю странную вещь о восстановлении баланса Кафки.Если я увеличу разделы по теме, на которую подписаны некоторые потребители java (в одной группе), перебалансировок потребителей не произойдет.После этого я пытаюсь вызвать перебалансировку, запустив нового потребителя (или убить одного), и новые увеличенные разделы не могут быть назначены в этом перебалансировании.Я обнаружил, что новые разделы могут быть назначены только после того, как я остановил всех потребителей и запустил их.Я не знаю, нормально ли это или есть какое-либо объяснение этому.
Ниже мой тест на моем компьютере:
1. Запустите Kafka, ZK.Создайте обычную тему ( test-topic ) с 1 разделом
./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic test-topic --partitions 1 --replication-factor 1 --config retention.ms=604800000
2. Запустите 2 Java-клиента ( C1 , C2 )подписаться test-topic
3.Увеличить 2 раздела test-topic
$ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic test-topic --partitions 3
В не происходит перебалансировкиC1 , C2
4.Начните нового потребителя C3 в подписанной тестовой теме.Произошел перебаланс, но только переназначается только раздел test-topic-0 , не задействованы test-topic-1 или test-topic-2 .
5. Я пытаюсь восстановить баланс, останавливая C2 и C3 .Однако test-topic-1 и test-topic-2 по-прежнему не назначены.
6. Остановите всех работающих потребителей, а затем запустите их.Все test-topic-0,1,2 назначаются нормально.
kafka & java api version : kafka_2.12-2.0.0 (я также пробовал kafka_2.11-1.0.0 и kafka_2.10-0.10.2.1, тот же результат)
zookeeper : 3.4.13
код потребителя :
public class KafkaConsumerThread extends Thread {
// consumer settings
public static org.apache.kafka.clients.consumer.KafkaConsumer<String, String> createNativeConsumer(String groupName, String kafkaBootstrap) {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBootstrap);
props.put("group.id", groupName);
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", true);
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
@Override
public void run() {
log.info("Start consumer ..");
consumer.subscribe(Collections.singleton(topicName), consumerRebalanceListener);
while (!stop) {
try {
ConsumerRecords<String, String> records = consumer.poll(100);
receivedRecordNumber.addAndGet(records.count());
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()) {
ConsumerRecord<String, String> record = iterator.next();
log.info("Receive [key:{}][value:{}]", record.key(), record.value());
}
} catch (TimeoutException e) {
log.info("no data");
}
}
consumer.close();
}
}
Спасибо за комментарий @Aftab Virtual.Я снова тестирую и жду дольше.Примерно через 5 минут после запуска первого потребителя перебалансировка автоматически поднялась, и все разделы test-topic-0,1,2 переназначились.Таким образом, у Kafka автоматически восстанавливается баланс после изменения разделов.
Кроме того, я следовал совету @Aftab Virtual изменить значение leader.imbalance.check.interval.seconds
на 30. Однако перебалансировка всех разделов происходит примерно через 3 минуты после увеличения раздела.Я добавляю настройки для брокера:
auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 30
Я не знаю, каков механизм этого перебалансирования.И больше нет логов для этого перебаланса:
[2018-10-18 11:32:47,958] INFO [GroupCoordinator 0]: Preparing to rebalance group test-group with old generation 4 (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
[2018-10-18 11:32:50,963] INFO [GroupCoordinator 0]: Stabilized group test-group generation 5 (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
[2018-10-18 11:32:50,964] INFO [GroupCoordinator 0]: Assignment received from leader for group test-group for generation 5 (kafka.coordinator.group.GroupCoordinator)