Добавление раздела Spring в загрузочную кафку займет некоторое время, чтобы - PullRequest
2 голосов
/ 01 июля 2019

Я изменяю тему Кафки, добавляю к ней раздел (от 3 до 4 разделов). На потребителя у меня уже есть 4 параллелизма, использующих @KafkaListener

@KafkaListener(topics = "t_multi_partitions", concurrency = "4")

Производитель отправляет сообщение каждые несколько секунд, но обычно менее 5 секунд для каждого сообщения.

Самое смешное, когда я изменяю раздел

kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic t_multi_partitions --partitions 4

Он не вступает в силу напрямую.
Потребовалось примерно 5 минут, чтобы производитель начал отправлять сообщение в 4-й раздел, а также 5 минут для восстановления баланса, чтобы 4-й потребитель вступил в силу

Это нормально? Я использую Spring boot 2.1 Как я могу настроить это 5 минут времени (больше или меньше)?

Код потребителя (упрощенно)

@KafkaListener(topics = "t_multi_partitions", concurrency = "4")
public void listen(ConsumerRecord<String, String> message) throws InterruptedException {
    log.info("Key : {}, Partition : {}, Message : {}", message.key(), message.partition(), message.value());
}

Ответы [ 2 ]

2 голосов
/ 01 июля 2019

Это контролируется Apache-Kafka, а не Spring-Kafka. Поведение нормальное.

Вы можете изменить частоту обновления, настроив metadata.max.age.ms. Это производитель и потребительский конфиг. Конфигурация производителя и потребителя не обязательно должна иметь одинаковое значение конфигурации.

Цитата из документации Apache-Kafka .

Период времени в миллисекундах, после которого мы принудительно обновляем метаданные, даже если мы не видели каких-либо изменений в руководстве раздела заранее обнаруживать любых новых брокеров или разделов.

Spring-Kafka в вашем примере работает с четырьмя @KafkaListener потоками. Темы и разделы делегируются каждому @KafkaListener на основе стратегии присваивания, определенной partition.assignment.strategy. RangeAssignor является стратегией по умолчанию. Это потребительский конфиг.

Цитата из документации Apache-Kafka.

Имя класса стратегии назначения разделов, которую клиент будет использовать для распределения владения разделами среди пользовательских экземпляров когда используется групповое управление

0 голосов
/ 02 июля 2019

На основе руководства @kkflf, конфигурация производителя

@Configuration
public class KafkaConfig {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        var props = kafkaProperties.buildProducerProperties();
        props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "600000");

        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

}
...