SpringAMQP - Как отловить исключение ListenerExecutionFailedException? - PullRequest
0 голосов
/ 22 января 2020

У меня 3 очереди.

Основная очередь

Повторная попытка очереди (3 раза контролируется кодом)

Очередь для сохранения сообщений об ошибках после 3 попытки.

Работает нормально ... но если я отправил неправильное сообщение json по любой причине, например:

{
"name":"alan","  <<<< this ," is wrong for example
"age":29,
}

мой слушатель не пытается обработать сообщение (Я хотел бы перехватить это сообщение, потому что если я получу сообщение об ошибке, я отправлю его в очередь ошибок)

Я получил ListenerExecutionFailedException, а затем потерял это сообщение.

Я попытался добавить обработчик ошибок в моя конфигурация:

@Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(RabbitProperties properties, ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception {
        PropertyMapper map = PropertyMapper.get();
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory(properties, connectionNameStrategy));
        factory.setMessageConverter(messageConverter());
        **factory.setErrorHandler(errorHandler());**
        RabbitProperties.Listener listener = properties.getListener();
        if (listener != null && listener.getSimple() != null) {
            map.from(listener.getSimple()::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
            map.from(listener.getSimple()::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
            map.from(listener.getSimple()::getDefaultRequeueRejected).whenNonNull().to(factory::setDefaultRequeueRejected);

        }
        return factory;
    }

public ErrorHandler errorHandler() {
        return new ConditionalRejectingErrorHandler(new ListenerExceptionHandler());
    }

@Log4j2
public class ListenerExceptionHandler extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

    @Override
    public boolean isFatal(Throwable t) {
        if (t instanceof ListenerExecutionFailedException) {
            ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
            log.error("Failed to process inbound message from queue {}, failed message={}",
                    lefe.getFailedMessage().getMessageProperties().getConsumerQueue(),
                    lefe.getFailedMessage());
        }
        return super.isFatal(t);
    }

}

Я просто хотел бы, если возможно, перехватить это ListenerExecutionFailedException на моем слушателе ... возможно?

потому что в моем классе обработчика я ничего не могу сделать ... просто установить журналы .. Я не могу отправить это сообщение в очередь ошибок.

слушатель:

@RabbitListener(queues = Queues.MAIN, concurrency = "2")
public void listenerMessage(Message message,@Header(name = "x-death", required = false) List<Map<String, ?>> xDeath) {
    log.info("ProcessMessage Received:: {}", message.getPayload());
    validateReceivedMessage(xDeath, message);
}

Я пытался добавить try / catch вроде:

try {

}catch (ListenerExecutionFailedException e){

}

без успеха.

Есть предложения? спасибо

1 Ответ

0 голосов
/ 22 января 2020

Нет; исключения преобразования сообщений происходят слишком далеко в стеке; нет способа вызвать слушателя с сообщением, которое не удалось преобразовать.

Такие исключения считаются фатальными, чтобы избежать бесконечного цикла; вам необходимо настроить обмен недоставленными буквами и ключ маршрутизации в исходной очереди, чтобы сообщение с ошибкой направлялось туда.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...