Мой обработчик ошибок kafka не вызывается, я должен вызывать явно? - PullRequest
0 голосов
/ 20 мая 2019

Я разрабатываю потребительский API kafka, который использует мои сообщения из темы.Когда он использует неправильное сообщение (скажем, неправильно сформированное сообщение JSON), я ожидаю, что мой обработчик ошибок должен быть вызван, чтобы уведомить группу поддержки о некоторых действиях с неправильным сообщением.

Но мой обработчик ошибок невызывается автоматически.Подскажите пожалуйста, чего не хватает в моем коде.

Если я автоматически подключу свой обработчик ошибок к своему классу слушателя и вызову явным образом, все будет работать нормально.

Класс обработчика ошибок

public class MyErrorHandler implements ErrorHandler, KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException ex, Consumer<?, ?> consumer) {
.....
   }
}

Конфигурация потребителя

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaManualAckListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerGroupFactory());
    //factory.getContainerProperties().setPollTimeout(3000);
    //factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    //factory.getContainerProperties().setAckOnError(false);
    factory.setErrorHandler(new MyErrorHandler());
    return factory;
}

@Bean
public MyErrorHandler myErrorHandler() {
    return new MyErrorHandler();
}

Класс прослушивателя

@KafkaListener(topics = "${kafka.proposal.topic.name}" ,                containerFactory = "kafkaManualAckListenerContainerFactory",                errorHandler ="${kafka.custom.error.handler}")
public void listen(ConsumerRecord<?,?> cr)  {   
     //Logic to get the message from topic and parse it to json, here i am testing with incorrect messages and producing JsonSyntaxException
}

Примечание: kafka.custom.error.handler =myErrorHandler в моем файле свойств.

Я ожидаю, что мой обработчик ошибок должен вызываться автоматически.Но это не так.Мне не хватает какой-либо конфигурации.

1 Ответ

0 голосов
/ 20 мая 2019

Если возникает ошибка десериализации, например, в JsonDeserializer это происходит до того, как Spring получает сообщение, поэтому мы не можем вызвать обработчик ошибок.

Начиная с версии 2.2, вы можете использовать ErrorHandlingDeserializer2 , чтобы обернуть десериализатор. Это позволяет платформе получить неудачную десериализацию и передать ее обработчику ошибок контейнера.

Когда десериализатору не удается десериализовать сообщение, Spring не может решить проблему, потому что это происходит до того, как функция poll () вернется. Чтобы решить эту проблему, версия 2.2 представила ErrorHandlingDeserializer2. Этот десериализатор делегирует реальный десериализатор (ключ или значение). Если делегату не удается десериализовать содержимое записи, ErrorHandlingDeserializer2 возвращает нулевое значение и исключение DeserializationException в заголовке, который содержит причину и необработанные байты. Когда вы используете MessageListener уровня записи, если ConsumerRecord содержит заголовок DeserializationException для ключа или значения, контейнер ErrorHandler вызывается с ошибочным ConsumerRecord. Запись не передается слушателю.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...