Обработка ошибок Spring-kafka с помощью DeadLetterPublishingRecoverer - PullRequest
1 голос
/ 06 апреля 2020

Я пытаюсь реализовать обработку ошибок в Spring boot kafa. В моем слушателе Kafka я выдаю исключение во время выполнения, как показано ниже:

@KafkaListener(topics= "Kafka-springboot-example", groupId="group-employee-json")
    public void consumeEmployeeJson(Employee employee) {
        logger.info("Consumed Employee JSON: "+ employee);

        if(null==employee.getEmployeeId()) {
            throw new RuntimeException("failed");
            //throw new ListenerExecutionFailedException("failed");
        }
    }

И я настроил обработку ошибок, как показано ниже:

@Configuration
@EnableKafka
public class KafkaConfiguration {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory,
            KafkaTemplate<Object, Object> template){

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory= new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.setErrorHandler(new SeekToCurrentErrorHandler(
                new DeadLetterPublishingRecoverer(template)));

        return factory;

    }
}

И мой слушатель для DLT соответствует ниже:

@KafkaListener(topics= "Kafka-springboot-example.DLT", groupId="group-employee-json")
    public void consumeEmployeeErrorJson(Employee employee) {
        logger.info("Consumed Employee JSON frpm DLT topic: "+ employee);
    }

Но мое сообщение не публикуется в DLT topi c.

Есть идеи, что я делаю неправильно?

Отредактировано:

application.properties

server.port=8088

#kafka-producer-config
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer


#Kafka consumer properties
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group-employee-json
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

1 Ответ

1 голос
/ 06 апреля 2020

public ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory(

Если вы используете нестандартное имя бина для фабрики контейнеров, вам нужно установить его на @KafkaListener в свойстве containerFactory.

имя компонента по умолчанию kafkaListenerContainerFactory, которое автоматически конфигурируется Boot. Вам необходимо либо переопределить этот компонент, либо настроить прослушиватель так, чтобы он указывал на ваше нестандартное имя компонента.

...