Весенние свойства kafka не загружаются автоматически при написании customConsumerFactory и customKafkaListenerContainerFactory - PullRequest
2 голосов
/ 25 марта 2020

Я хочу загрузить свои свойства spring-kafka из application.properties, и они должны быть загружены с помощью автоконфигурации Spring. Моя проблема вызвана: java .lang.IllegalStateException: Нет подтверждения, доступного в качестве аргумента, у контейнера слушателя должен быть MANUAL AckMode, чтобы заполнить подтверждение, однако я уже установил его в файле свойств spring.kafka.listener.ack- mode = manual-немедленный в этих свойствах, потому что это мой пользовательский fooKafkaListenerContainerFactory. Невозможно выбрать эти настройки. То, что я хочу, не устанавливая это вручную, это должно быть взято из моего application.properies. @ Гари Рассел, ваша помощь приветствуется.

Мой код выглядит ниже

package com.foo;

import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import com.foo.FooKafkaDTO;

@Configuration
public class KafkaConsumerConfig {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean
    @ConditionalOnMissingBean(ConsumerFactory.class)
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {

        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
    }

    @Bean
    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        configurer.configure(factory, kafkaConsumerFactory);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, FooKafkaDTO> fooConsumerFactory() {

        return new DefaultKafkaConsumerFactory<>(
                kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(FooKafkaDTO.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, FooKafkaDTO> fooKafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<String, FooKafkaDTO> fooConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, FooKafkaDTO> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(fooConsumerFactory());
        return factory;
    }
}


Here are my properties 

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.listener.ack-mode=manual-immediate
spring.kafka.consumer.group-id=group_id
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable.auto.commit=false
spring.kafka.consumer.key-deserialize=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.value-deserialize=org.springframework.kafka.support.serializer.JsonDeserializer


Here is my listener

@Service
public class Consumer {

    private static final Log LOG = LogFactory.getLog(Consumer.class);

    @KafkaListener(
            topicPartitions = {@TopicPartition(topic = "outbox.foo",
                    partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))},
            groupId = "group_id",
            containerFactory = "fooKafkaListenerContainerFactory")
    public void consume(@Payload FooKafkaDTO fooKafkaDTO, Acknowledgment acknowledgment,
            @Headers MessageHeaders headers) {

        LOG.info("offset:::" + Long.valueOf(headers.get(KafkaHeaders.OFFSET).toString()));
        LOG.info(String.format("$$ -> Consumed Message -> %s", fooKafkaDTO));
        acknowledgment.acknowledge();

    }
}

1 Ответ

1 голос
/ 01 апреля 2020

После просмотра документации по spring-kafka spring-kafka-official-Документация ! Я мог бы найти этот код, который заменил весь шаблонный код. Я упростил свой класс KafkaConsumerConfig, и теперь он выглядит следующим образом.

package com.foo

import java.util.Map;

import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import com.foo.FooKafkaDTO;

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public DefaultKafkaConsumerFactory fooDTOConsumerFactory(KafkaProperties properties) {

        Map<String, Object> props = properties.buildConsumerProperties();
        return new DefaultKafkaConsumerFactory(props,
                new JsonDeserializer<>(String.class)
                        .forKeys()
                        .ignoreTypeHeaders(),
                new JsonDeserializer<>(FooKafkaDTO.class)
                        .ignoreTypeHeaders());

    }
}
...