Изменение фабричного компонента контейнера по умолчанию с kafkaListenerContainerFactory на мою фабрику пользовательских контейнеров - PullRequest
2 голосов
/ 28 января 2020

В документах spring-kafka мы можем прочитать, что фабрика контейнеров по умолчанию предполагается доступной с именем компонента kafkaListenerContainerFactory, если в конфигурации не было указано явное значение по умолчанию.

Я хотел бы спросить, можно ли изменить конфигурацию, чтобы использовать мой фабричный компонент (например, customKafkaListenerContainerFactory), а не kafkaListenerContainerFactory?

Пример кода -> если мы наберем

@KafkaListener(id = "cat", topics = "myTopic")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

, тогда bean-компонентом containerFactory по умолчанию будет customKafkaListenerContainerFactory, а не kafkaListenerContainerFactory

Чтобы быть более точным -> если я не предоставлю какой-либо атрибут containerFactory, тогда используется customKafkaListenerContainerFactory НЕ kafkaListenerContainerFactory

1 Ответ

1 голос
/ 28 января 2020

Да, вы можете с помощью атрибута containerFactory в аннотации @KafkaListener, вы можете установить пользовательский фабричный bean-компонент контейнера kafka

Имя компонента KafkaListenerContainerFactory, используемого для создания контейнер получателя сообщений, отвечающий за обслуживание этой конечной точки.

@KafkaListener(id = "cat", topics = "myTopic", containerFactory="customKafkaListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
   ...
  ack.acknowledge();
}

Или вы можете переопределить значение по умолчанию kafkaListenerContainerFactory в классе Config. И, как сказал @Gary Russell, просто используйте одно и то же имя бина, оно заменит Boot's, что обусловлено наличием бина с таким именем

@Configuration
@EnableKafka
public class Config {

   @Bean
   @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
   ConcurrentKafkaListenerContainerFactory<Integer, String>
                    kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory,consumerFactory());
    // set custom properties
    return factory;
   }

   @Bean
   public ConsumerFactory<Integer, String> consumerFactory() {
      return new DefaultKafkaConsumerFactory<>(consumerConfigs());
   }

   @Bean
   public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    ...
    return props;
  }
}
...