Потребитель Кафки подбирает темы динамически - PullRequest
0 голосов
/ 31 января 2019

У меня настроен потребитель Kafka в Spring Boot.Вот класс конфигурации:

@EnableKafka
@Configuration
@PropertySource({"classpath:kafka.properties"})
public class KafkaConsumerConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConsumerFactory<String, GenericData.Record> consumerFactory() {

        dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
        dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));
        dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));
        dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));
        dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));
        dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));

        dataRiverProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, env.getProperty("schema.registry.url"));
        dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());

        return new DefaultKafkaConsumerFactory<>(dataRiverProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

А вот потребитель:

@Component
public class KafkaConsumer {

    @Autowired
    private MessageProcessor messageProcessor;

    @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
    public void consumeAvro(GenericData.Record message) {
        messageProcessor.process();
    }

}

Обратите внимание, что я использую themes = "# {'$ {kafka.topics}'.split (',')} ", чтобы выбрать темы из файла свойств.Вот как выглядит мой файл kafka.properties:

kafka.topics=pwdChange,pwdCreation
bootstrap.servers=aaa.bbb.com:37900
group.id=pwdManagement
enable.auto.commit=true
auto.commit.interval.ms=1000
session.timeout.ms=30000
schema.registry.url=http://aaa.bbb.com:37800

Теперь, если я хочу добавить новую тему в подписку, произнесите pwdExpire и измените файлы поддержки следующим образом:

kafka.topics=pwdChange,pwdCreation,pwdExpire

Может ли мой потребитель начать подписку на эту новую тему, не перезагружая сервер?Я нашел это сообщение Spring Kafka - подписывайте новые темы во время выполнения , но в документации есть это, чтобы сказать о metadata.max.age.ms :

Период времени в миллисекундах, после которого мы принудительно обновляем метаданные, даже если мы не видели каких-либо изменений в руководстве по разделам для проактивного обнаружения каких-либо новых брокеров или разделов.т работа.Спасибо за вашу помощь!

1 Ответ

0 голосов
/ 31 января 2019

Нет;единственный способ сделать это - использовать шаблон темы;По мере добавления новых тем (соответствующих шаблону) брокер добавляет их в подписку по умолчанию через 5 минут.

Однако вы можете добавить новые контейнеры приемника для новой темы.(s) во время выполнения.

Другой вариант - загрузить бин @KafkaListener в дочерний контекст приложения и заново создавать контекст при каждом изменении темы.

РЕДАКТИРОВАТЬ

См. Javadocs для KafkaConsumer.subscribe(Pattern pattern) ...

/**
 * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
 * The pattern matching will be done periodically against topics existing at the time of check.
 * <p>
 ...
...