Подключение к нескольким кластерам Kafka с помощью Spring Kafka - PullRequest
4 голосов
/ 19 июня 2020

У меня есть приложение весенней загрузки, которое потребляет сообщения от топи c (скажем, topic1) в кластере Kafka. Так сейчас выглядит мой код.

@Configuration
public class KafkaTopicConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topic1() {
         return new NewTopic("baeldung", 1, (short) 1);
    }
}
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        props.put(
          ConsumerConfig.GROUP_ID_CONFIG, 
          groupId);
        props.put(
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        props.put(
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
      kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
@KafkaListener(topics = "topicName", groupId = "foo")
public void listen(String message) {
    System.out.println("Received Messasge in group foo: " + message);
}

Теперь я хочу начать потребление из другого топи c в другом кластере Kafka. Один из способов - создать для этого еще один компонент. Но есть ли лучший способ сделать это?

1 Ответ

1 голос
/ 30 июня 2020

Вам нужны другие ConsumerFactory и KafkaListenerContainerFactory, которые подключаются к серверам bootstrap другого кластера.

Затем вы можете использовать containerFactory в аннотации @KafkaListener .

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> otherClusterFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(otherConsumerFactory());
    return factory;
}

...

@KafkaListener(...., containerFactory="otherClusterFactory")
...