Настройка фильтра тем Кафки - PullRequest
0 голосов
/ 12 июня 2018

Я использую этот учебник для настройки проекта.Все работает хорошо, пока я не добавлю фабрику контейнеров клиента на моем приемникеВот мой KafkaReciverConfig.

@EnableKafka
@Configuration
public class ReceiverConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(GROUP_ID_CONFIG, "app.topic");
        props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, TopicPayload> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new JsonDeserializer<>(TopicPayload.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, TopicPayload> filterKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, TopicPayload> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

А вот мой Receiver.liste метод

@KafkaListener(id = "app.topic", topics = "${app.topic.topicname}", containerFactory = "filterKafkaListenerContainerFactory")
public void listen(@Payload TopicPayload payload) {
    LOGGER.info("-------------------- " + payload);
}

Если я не укажу containerFactory, он отлично работает.но как только я укажу (я планирую добавить здесь логику фильтрации), я получаю следующую ошибку

***************************
APPLICATION FAILED TO START
***************************

Description:

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
    - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans of type 'org.springframework.kafka.core.ConsumerFactory' consumerFactory


Action:

1 Ответ

0 голосов
/ 12 июня 2018

Эта ошибка вызвана классом KafkaAnnotationDrivenConfiguration.

В классе KafkaAnnotationDrivenConfiguration, если не существует bean-компонента с именем "kafkaListenerContainerFactory", зарегистрируйте kafkaListenerContainerFactory в качестве метода bean.

* 1004 - код метода Bean *.зарегистрировать bean-компонент kafkaListenerContainerFactory в KafkaAnnotationDrivenConfiguration.
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    return factory;
}

Этот метод принимает ConsumerFactory <Object, Object> в качестве аргумента.Однако ConsumerFactory, с которой вы регистрируетесь, - это ConsumerFactory <String, TopicPayload>, поэтому Spring не находит bean-компонент типа ConsumerFactory <Object, Object> и выдает исключение

. Решение состоит в том, чтобы не регистрировать ConsumerFactory и consumerConfigs в качестве bean-компонентов.В этом случае ConsumerFactory будет автоматически зарегистрирован в качестве компонента KafkaAutoConfiguration. (KafkaAutoConfiguration будет автоматически регистрировать компонент, только если нет компонента типа ConsumerFactory.)

public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(GROUP_ID_CONFIG, "app.topic");
    props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return props;
}

public ConsumerFactory<String, TopicPayload> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
            new JsonDeserializer<>(TopicPayload.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, TopicPayload> filterKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, TopicPayload> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

Более простой метод состоит в использованииимя kafkaListenerContainerFactory вместо имени filterKafkaListenerContainerFactory.

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(GROUP_ID_CONFIG, "app.topic");
    props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return props;
}

@Bean
public ConsumerFactory<String, TopicPayload> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
            new JsonDeserializer<>(TopicPayload.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, TopicPayload> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, TopicPayload> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

Это немного сложно объяснить.Если вы знаете автоконфигурацию загрузки Spring, это будет легко понять.

Я надеюсь, что мой образец работает.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...