у нас есть микросервис, который использует сообщение с помощью @RabbitListener и сохраняет данные в базе данных, генерирует ответ об успешной обработке сообщения и отправляет его с помощью @sendTO в другую очередь для аудита.
При запуске Rabbit в режиме отработки отказа HA при отправке ответа в случае потери соединения обрабатываемое в данный момент сообщение корректно возвращается в очередь, но транзакция базы данных (в нашем случае транзакция jpa) не откатывается, ответ никогда не отправляется .
Я прочитал из этого выпуска (https://github.com/spring-projects/spring-amqp/issues/696), что это синхронизация транзакций «1P C» с максимальным усилием; RabbitMQ не поддерживает транзакции XA. Rabbit tx фиксируется после DB tx, и существует вероятность, что DB tx может зафиксировать и кролик откатится назад; вам приходится иметь дело с небольшой вероятностью дублирования сообщений.
Но в нашем случае, когда мы повторяем запрос, мы рассматриваем его как дублирующееся сообщение, и ответ на этот запрос никогда не создается. Есть ли способ, при котором мы можем повторить попытку отправки ответного сообщения только в случае потери соединения, а не повторной обработки запроса? Я посмотрел на ConditionalRejectingErrorHandler.DefaultExceptionStrategy, у него есть доступ только к исходному запросу, нет способа получить доступ к ответу, потерянному во время сбоя соединения. Пожалуйста, предложите, как лучше всего справиться с этим?
наш код выглядит так:
SpringBootApplication
@EnableJpaRepositories("com.***")
@EnableJpaAuditing
@EnableTransactionManagement
@EnableEncryptableProperties
public class PcaClinicalValidationApplication {
@RabbitListener(queues = "myqueue"
@SendTo("exchange/routingKey")
@Timed) description = "Time taken to process a request")
public Message receivemessage(HashMap<String, Object> myMap, Message requestMessage)
throws Exception {
//business logic goes here
Message message = MessageBuilder.fromMessage(requestMessage)
//add some headers
return message;
}
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setRetryTemplate(new RetryTemplate());
factory.setReplyRecoveryCallback(ctx -> {
Message failed = SendRetryContextAccessor.getMessage(ctx);
Address replyTo = SendRetryContextAccessor.getAddress(ctx);
Throwable t = ctx.getLastThrowable();
//wrote to a file
serializer.serialize(failed);
return null;
});
return factory;
}