У меня 3 очереди.
Основная очередь
Повторная попытка очереди (3 раза контролируется кодом)
Очередь для сохранения сообщений об ошибках после 3 попытки.
Работает нормально ... но если я отправил неправильное сообщение json по любой причине, например:
{
"name":"alan"," <<<< this ," is wrong for example
"age":29,
}
мой слушатель не пытается обработать сообщение (Я хотел бы перехватить это сообщение, потому что если я получу сообщение об ошибке, я отправлю его в очередь ошибок)
Я получил ListenerExecutionFailedException, а затем потерял это сообщение.
Я попытался добавить обработчик ошибок в моя конфигурация:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(RabbitProperties properties, ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception {
PropertyMapper map = PropertyMapper.get();
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory(properties, connectionNameStrategy));
factory.setMessageConverter(messageConverter());
**factory.setErrorHandler(errorHandler());**
RabbitProperties.Listener listener = properties.getListener();
if (listener != null && listener.getSimple() != null) {
map.from(listener.getSimple()::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
map.from(listener.getSimple()::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
map.from(listener.getSimple()::getDefaultRequeueRejected).whenNonNull().to(factory::setDefaultRequeueRejected);
}
return factory;
}
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(new ListenerExceptionHandler());
}
@Log4j2
public class ListenerExceptionHandler extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
@Override
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException) {
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
log.error("Failed to process inbound message from queue {}, failed message={}",
lefe.getFailedMessage().getMessageProperties().getConsumerQueue(),
lefe.getFailedMessage());
}
return super.isFatal(t);
}
}
Я просто хотел бы, если возможно, перехватить это ListenerExecutionFailedException на моем слушателе ... возможно?
потому что в моем классе обработчика я ничего не могу сделать ... просто установить журналы .. Я не могу отправить это сообщение в очередь ошибок.
слушатель:
@RabbitListener(queues = Queues.MAIN, concurrency = "2")
public void listenerMessage(Message message,@Header(name = "x-death", required = false) List<Map<String, ?>> xDeath) {
log.info("ProcessMessage Received:: {}", message.getPayload());
validateReceivedMessage(xDeath, message);
}
Я пытался добавить try / catch вроде:
try {
}catch (ListenerExecutionFailedException e){
}
без успеха.
Есть предложения? спасибо