Spring AMQP как получить доступ к ответному сообщению при потере соединения при отправке с использованием @SendTo - PullRequest
0 голосов
/ 14 января 2020

у нас есть микросервис, который использует сообщение с помощью @RabbitListener и сохраняет данные в базе данных, генерирует ответ об успешной обработке сообщения и отправляет его с помощью @sendTO в другую очередь для аудита.

При запуске Rabbit в режиме отработки отказа HA при отправке ответа в случае потери соединения обрабатываемое в данный момент сообщение корректно возвращается в очередь, но транзакция базы данных (в нашем случае транзакция jpa) не откатывается, ответ никогда не отправляется .

Я прочитал из этого выпуска (https://github.com/spring-projects/spring-amqp/issues/696), что это синхронизация транзакций «1P C» с максимальным усилием; RabbitMQ не поддерживает транзакции XA. Rabbit tx фиксируется после DB tx, и существует вероятность, что DB tx может зафиксировать и кролик откатится назад; вам приходится иметь дело с небольшой вероятностью дублирования сообщений.

Но в нашем случае, когда мы повторяем запрос, мы рассматриваем его как дублирующееся сообщение, и ответ на этот запрос никогда не создается. Есть ли способ, при котором мы можем повторить попытку отправки ответного сообщения только в случае потери соединения, а не повторной обработки запроса? Я посмотрел на ConditionalRejectingErrorHandler.DefaultExceptionStrategy, у него есть доступ только к исходному запросу, нет способа получить доступ к ответу, потерянному во время сбоя соединения. Пожалуйста, предложите, как лучше всего справиться с этим?

наш код выглядит так:

SpringBootApplication
@EnableJpaRepositories("com.***")
@EnableJpaAuditing
@EnableTransactionManagement
@EnableEncryptableProperties
public class PcaClinicalValidationApplication {

@RabbitListener(queues = "myqueue"
  @SendTo("exchange/routingKey")
  @Timed) description = "Time taken to process a request")
  public Message receivemessage(HashMap<String, Object> myMap, Message requestMessage)
      throws Exception {
   //business logic goes here
Message message = MessageBuilder.fromMessage(requestMessage)
//add some headers
return message;

}

public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
            SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setRetryTemplate(new RetryTemplate());
        factory.setReplyRecoveryCallback(ctx -> {
            Message failed = SendRetryContextAccessor.getMessage(ctx);          
            Address replyTo = SendRetryContextAccessor.getAddress(ctx);
            Throwable t = ctx.getLastThrowable();

     //wrote to a file          
     serializer.serialize(failed);
            return null;
        });
        return factory;
    }

1 Ответ

0 голосов
/ 14 января 2020

Фабрика контейнера слушателя использует RabbitTemplate в своем свойстве replyTemplate - это используется для отправки ответа.

Вы можете настроить RetryTemplate в этом RabbitTemplate, чтобы повторить попытку отправки ответа .

Когда повторные попытки исчерпаны, вы можете добавить RecoveryCallback, который получит отказавший ответ, и вы можете сохранить его где-нибудь и использовать его, когда произойдет повторная доставка.

...