У меня есть служба загрузки Spring с RabbitMQ.
Я разрабатываю тест, чтобы убедиться, что процесс сохраняет состояние транзакции среди нескольких транзакций.
Пример:
@Transactional
@RabbitListener
Queue Listener 1
1- receive message
2- call to Class 1 Method 1
@Transactional
Class 1 Method 1
1- Persist some data in data base
2- Publish a Message in a 2nd Queue
@Transactional
@RabbitListener
Queue Listener 2
- read message
- doStuff()
Я бы хотел, чтобы мой сервис имел следующее поведение:
Если в Queue Listener 2
выдается исключение (например, в doStuff()
), я бы также хотел откатить транзакцию Class 1 Method 1
. И я хотел бы также возможности Retries.
Конфигурирование транзакций с Propagation.SUPPORTS
или Propagation.REQUIRED
, похоже, тоже не работает.
Слушатели кроликов помечаются как @ Transactional.
Для этого я настроил SimpleRabbitListenerContainerFactory
Advice Chain как StatefulRetryOperationsInterceptor
.
. Я настроил Менеджер транзакций, который в моем Приложении - JpaTransactionManager
.
Канал шаблонов кроликов. Транзакции установлены наtrue,
Также конвертер сообщений с jackson2JsonMessageConverter.setCreateMessageIds(true);
, для хранения идентификаторов сообщений.
Здесь я прилагаю свою SimpleRabbitListenerContainerFactory
config:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(final SimpleRabbitListenerContainerFactoryConfigurer configurer,
final PlatformTransactionManager platformTransactionManager) throws IOException {
final SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory();
final CachingConnectionFactory cachingConnectionFactory = cachingConnectionFactory();
container.setConnectionFactory(cachingConnectionFactory);
container.setMessageConverter(jackson2JsonMessageConverter());
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDefaultRequeueRejected(false);
container.setConsecutiveIdleTrigger(1);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setChannelTransacted(true);
container.setTransactionManager(platformTransactionManager);
container.setConcurrentConsumers(listenerConcurrency);
container.setMaxConcurrentConsumers(maxListenerConcurrency);
final StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor = RetryInterceptorBuilder.stateful()
.retryPolicy(new SimpleRetryPolicy(maxAttempts, exceptionsTriggeringRetry()))
.backOffPolicy(backOffPolicy())
.build();
container.setAdviceChain(statefulRetryOperationsInterceptor);
configurer.configure(container, cachingConnectionFactory);
return container;
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
final Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(objectMapper);
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
@Bean
public RabbitTemplate transactedRabbitTemplate(@Named("cachingConnectionFactory") final ConnectionFactory connectionFactory,
@Named("jackson2JsonMessageConverter") final Jackson2JsonMessageConverter jackson2MessageConverter){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2MessageConverter);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
Политика повторных попыток и ееколичество повторных попыток, по-видимому, соблюдается,
, но целостность транзакции (с откатом), по-видимому, не выполняется (поэтому откатов в предыдущих транзакциях нет. Т.е.: сбой 3-й транзакции не вызывает откат 2-й транзакции), но, конечно,ошибки внутри одной и той же транзакции работают правильно.
РЕДАКТИРОВАТЬ
Пробовал также:
container.setAdviceChain(new TransactionInterceptor(platformTransactionManager, new Properties()));
, но, похоже, не работаетв любом случае.