Контейнер Spring Listener - Менеджер транзакций - Spring Retry - среди нескольких транзакций - PullRequest
3 голосов
/ 05 ноября 2019

У меня есть служба загрузки Spring с RabbitMQ.

Я разрабатываю тест, чтобы убедиться, что процесс сохраняет состояние транзакции среди нескольких транзакций.

Пример:

@Transactional
@RabbitListener
Queue Listener 1
1- receive message
2- call to Class 1 Method 1

@Transactional
Class 1 Method 1
1- Persist some data in data base
2- Publish a Message in a 2nd Queue

@Transactional
@RabbitListener
Queue Listener 2
- read message
- doStuff()

Я бы хотел, чтобы мой сервис имел следующее поведение:

Если в Queue Listener 2 выдается исключение (например, в doStuff()), я бы также хотел откатить транзакцию Class 1 Method 1. И я хотел бы также возможности Retries.

Конфигурирование транзакций с Propagation.SUPPORTS или Propagation.REQUIRED, похоже, тоже не работает.

Слушатели кроликов помечаются как @ Transactional.

Для этого я настроил SimpleRabbitListenerContainerFactory Advice Chain как StatefulRetryOperationsInterceptor.

. Я настроил Менеджер транзакций, который в моем Приложении - JpaTransactionManager.

Канал шаблонов кроликов. Транзакции установлены наtrue,

Также конвертер сообщений с jackson2JsonMessageConverter.setCreateMessageIds(true);, для хранения идентификаторов сообщений.

Здесь я прилагаю свою SimpleRabbitListenerContainerFactory config:

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(final SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                                               final PlatformTransactionManager platformTransactionManager) throws IOException {

        final SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory();
        final CachingConnectionFactory cachingConnectionFactory = cachingConnectionFactory();

        container.setConnectionFactory(cachingConnectionFactory);
        container.setMessageConverter(jackson2JsonMessageConverter());
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setDefaultRequeueRejected(false);
        container.setConsecutiveIdleTrigger(1);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        container.setChannelTransacted(true);

        container.setTransactionManager(platformTransactionManager);

        container.setConcurrentConsumers(listenerConcurrency);
        container.setMaxConcurrentConsumers(maxListenerConcurrency);

        final StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor = RetryInterceptorBuilder.stateful()
                .retryPolicy(new SimpleRetryPolicy(maxAttempts, exceptionsTriggeringRetry()))
                .backOffPolicy(backOffPolicy())
                .build();

        container.setAdviceChain(statefulRetryOperationsInterceptor);
        configurer.configure(container, cachingConnectionFactory);

        return container;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {

        final ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS);
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

        final Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(objectMapper);
        jackson2JsonMessageConverter.setCreateMessageIds(true);

        return jackson2JsonMessageConverter;
    }

    @Bean
    public RabbitTemplate transactedRabbitTemplate(@Named("cachingConnectionFactory") final ConnectionFactory connectionFactory,
                                                  @Named("jackson2JsonMessageConverter") final Jackson2JsonMessageConverter jackson2MessageConverter){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2MessageConverter);
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

Политика повторных попыток и ееколичество повторных попыток, по-видимому, соблюдается,

, но целостность транзакции (с откатом), по-видимому, не выполняется (поэтому откатов в предыдущих транзакциях нет. Т.е.: сбой 3-й транзакции не вызывает откат 2-й транзакции), но, конечно,ошибки внутри одной и той же транзакции работают правильно.

РЕДАКТИРОВАТЬ

Пробовал также:

container.setAdviceChain(new TransactionInterceptor(platformTransactionManager, new Properties()));

, но, похоже, не работаетв любом случае.

1 Ответ

0 голосов
/ 05 ноября 2019

Транзакция не может занимать такую ​​очередь;публикация в очередь - это конец пути для первой транзакции.

Производители и потребители изолированы друг от друга.

Поскольку вы также используете транзакции Rabbit, Listener 2 не будетдаже увидеть сообщение, пока первая транзакция не завершится.

...