Я разрабатываю потребительский API kafka, который использует мои сообщения из темы.Когда он использует неправильное сообщение (скажем, неправильно сформированное сообщение JSON), я ожидаю, что мой обработчик ошибок должен быть вызван, чтобы уведомить группу поддержки о некоторых действиях с неправильным сообщением.
Но мой обработчик ошибок невызывается автоматически.Подскажите пожалуйста, чего не хватает в моем коде.
Если я автоматически подключу свой обработчик ошибок к своему классу слушателя и вызову явным образом, все будет работать нормально.
Класс обработчика ошибок
public class MyErrorHandler implements ErrorHandler, KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException ex, Consumer<?, ?> consumer) {
.....
}
}
Конфигурация потребителя
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerGroupFactory());
//factory.getContainerProperties().setPollTimeout(3000);
//factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
//factory.getContainerProperties().setAckOnError(false);
factory.setErrorHandler(new MyErrorHandler());
return factory;
}
@Bean
public MyErrorHandler myErrorHandler() {
return new MyErrorHandler();
}
Класс прослушивателя
@KafkaListener(topics = "${kafka.proposal.topic.name}" , containerFactory = "kafkaManualAckListenerContainerFactory", errorHandler ="${kafka.custom.error.handler}")
public void listen(ConsumerRecord<?,?> cr) {
//Logic to get the message from topic and parse it to json, here i am testing with incorrect messages and producing JsonSyntaxException
}
Примечание: kafka.custom.error.handler =myErrorHandler в моем файле свойств.
Я ожидаю, что мой обработчик ошибок должен вызываться автоматически.Но это не так.Мне не хватает какой-либо конфигурации.