Эта ошибка вызвана классом KafkaAnnotationDrivenConfiguration.
В классе KafkaAnnotationDrivenConfiguration, если не существует bean-компонента с именем "kafkaListenerContainerFactory", зарегистрируйте kafkaListenerContainerFactory в качестве метода bean.
* 1004 - код метода Bean *.зарегистрировать bean-компонент kafkaListenerContainerFactory в KafkaAnnotationDrivenConfiguration.
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
Этот метод принимает ConsumerFactory <Object, Object>
в качестве аргумента.Однако ConsumerFactory, с которой вы регистрируетесь, - это ConsumerFactory <String, TopicPayload>
, поэтому Spring не находит bean-компонент типа ConsumerFactory <Object, Object>
и выдает исключение
. Решение состоит в том, чтобы не регистрировать ConsumerFactory и consumerConfigs в качестве bean-компонентов.В этом случае ConsumerFactory будет автоматически зарегистрирован в качестве компонента KafkaAutoConfiguration. (KafkaAutoConfiguration будет автоматически регистрировать компонент, только если нет компонента типа ConsumerFactory.)
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(GROUP_ID_CONFIG, "app.topic");
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
public ConsumerFactory<String, TopicPayload> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(TopicPayload.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, TopicPayload> filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, TopicPayload> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
Более простой метод состоит в использованииимя kafkaListenerContainerFactory вместо имени filterKafkaListenerContainerFactory.
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(GROUP_ID_CONFIG, "app.topic");
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, TopicPayload> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(TopicPayload.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, TopicPayload> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, TopicPayload> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
Это немного сложно объяснить.Если вы знаете автоконфигурацию загрузки Spring, это будет легко понять.
Я надеюсь, что мой образец работает.