Как я могу использовать динамическую тему для Kafka Consumer - PullRequest
0 голосов
/ 06 ноября 2019

укажите ниже конфигурацию Kafka для локального сервера @Bean public ConsumerFactory consumerFactory () {Map config = new HashMap <> ();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(config);
}


@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
    ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}



here I need to change the topic as a dynamic content
@KafkaListener(topics = "mytopic")
private void consume2(String Data) {
    String userID = null;
    String topic = null;
    String message = null;
    System.out.println(Data);
}

1 Ответ

0 голосов
/ 06 ноября 2019

Вы пробовали что-то вроде

@ KafkaListener (themes = "$ {kafka.topics}")

И задали его как переменную среды или свойство приложения?

Переменная окружения KAFKA_TOPICS = mytopic application.properties kafka.topics = mytopic

...