Кафка: Увеличенные разделы не могут быть назначены при следующем перебалансировании - PullRequest
0 голосов
/ 17 октября 2018

Я встречаю странную вещь о восстановлении баланса Кафки.Если я увеличу разделы по теме, на которую подписаны некоторые потребители java (в одной группе), перебалансировок потребителей не произойдет.После этого я пытаюсь вызвать перебалансировку, запустив нового потребителя (или убить одного), и новые увеличенные разделы не могут быть назначены в этом перебалансировании.Я обнаружил, что новые разделы могут быть назначены только после того, как я остановил всех потребителей и запустил их.Я не знаю, нормально ли это или есть какое-либо объяснение этому.

Ниже мой тест на моем компьютере:

1. Запустите Kafka, ZK.Создайте обычную тему ( test-topic ) с 1 разделом

./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic test-topic --partitions 1 --replication-factor 1 --config retention.ms=604800000

2. Запустите 2 Java-клиента ( C1 , C2 )подписаться test-topic

3.Увеличить 2 раздела test-topic

$ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic test-topic --partitions 3

В не происходит перебалансировкиC1 , C2

4.Начните нового потребителя C3 в подписанной тестовой теме.Произошел перебаланс, но только переназначается только раздел test-topic-0 , не задействованы test-topic-1 или test-topic-2 .

5. Я пытаюсь восстановить баланс, останавливая C2 и C3 .Однако test-topic-1 и test-topic-2 по-прежнему не назначены.

6. Остановите всех работающих потребителей, а затем запустите их.Все test-topic-0,1,2 назначаются нормально.

kafka & java api version : kafka_2.12-2.0.0 (я также пробовал kafka_2.11-1.0.0 и kafka_2.10-0.10.2.1, тот же результат)

zookeeper : 3.4.13

код потребителя :

public class KafkaConsumerThread extends Thread {
    // consumer settings
    public static org.apache.kafka.clients.consumer.KafkaConsumer<String, String> createNativeConsumer(String groupName, String kafkaBootstrap) {
        Properties props = new Properties();
        props.put("bootstrap.servers", kafkaBootstrap);
        props.put("group.id", groupName);
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", true);
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer<String, String>(props);
    }

    @Override
    public void run() {
        log.info("Start consumer ..");
        consumer.subscribe(Collections.singleton(topicName), consumerRebalanceListener);
        while (!stop) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(100);
                receivedRecordNumber.addAndGet(records.count());
                Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
                while (iterator.hasNext()) {
                    ConsumerRecord<String, String> record = iterator.next();
                    log.info("Receive [key:{}][value:{}]", record.key(), record.value());
                }
            } catch (TimeoutException e) {
                log.info("no data");
            }
        }
        consumer.close();
    }
}

Спасибо за комментарий @Aftab Virtual.Я снова тестирую и жду дольше.Примерно через 5 минут после запуска первого потребителя перебалансировка автоматически поднялась, и все разделы test-topic-0,1,2 переназначились.Таким образом, у Kafka автоматически восстанавливается баланс после изменения разделов.

Кроме того, я следовал совету @Aftab Virtual изменить значение leader.imbalance.check.interval.seconds на 30. Однако перебалансировка всех разделов происходит примерно через 3 минуты после увеличения раздела.Я добавляю настройки для брокера:

auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 30

Я не знаю, каков механизм этого перебалансирования.И больше нет логов для этого перебаланса:

[2018-10-18 11:32:47,958] INFO [GroupCoordinator 0]: Preparing to rebalance group test-group with old generation 4 (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
[2018-10-18 11:32:50,963] INFO [GroupCoordinator 0]: Stabilized group test-group generation 5 (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
[2018-10-18 11:32:50,964] INFO [GroupCoordinator 0]: Assignment received from leader for group test-group for generation 5 (kafka.coordinator.group.GroupCoordinator)

1 Ответ

0 голосов
/ 23 октября 2018

Получив совет от команды Kafka и некоторых пользователей Kafka, я получил объяснение моего результата теста.Это не ошибка.

Увеличение разделов будет помечено как metadata.updateNeeded = true.Однако это не будет запускать обновление до истечения следующего времени метаданных (по умолчанию metadata.max.age.ms составляет 5 * 60 * 1000 мс).До того, как руководитель группы обновит свои метаданные, перебалансировка, вызванная изменением номера потребителя, не затронет новые разделы.

Я уменьшил metadata.max.age.ms до 30 секунд, и Кафка стал более чувствительным к увеличению разделов.

...