ActiveMQ Redelivery не работает с новой транзакцией в потребителе - PullRequest
0 голосов
/ 28 марта 2020

У меня есть ActiveMQ, где я настроил Redelivery на стороне клиента. С простым потребителем он работает, как и ожидалось, с приведенными ниже конфигурациями:

import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.boot.jta.atomikos.AtomikosConnectionFactoryBean;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

...
    @Bean
    public ConnectionFactory atomikosConnectionFactoryBean() {
        String mqUrl = System.getenv("MQ_URL");
        AtomikosConnectionFactoryBean atomikos = new AtomikosConnectionFactoryBean();
        atomikos.setLocalTransactionMode(false);
        atomikos.setMaxPoolSize(10);
        atomikos.setUniqueResourceName("QUEUE_BROKER");

        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(4);
        redeliveryPolicy.setBackOffMultiplier(10);
        redeliveryPolicy.setRedeliveryDelay(1000L);
        redeliveryPolicy.setInitialRedeliveryDelay(1000L);
        redeliveryPolicy.setUseExponentialBackOff(true);
        redeliveryPolicy.setMaximumRedeliveryDelay(86400000L);
        ActiveMQXAConnectionFactory xaConnectionFactoryBean = new ActiveMQXAConnectionFactory(System.getenv("MQ_USERNAME"), System.getenv("MQ_PASSWORD"), mqUrl);
        xaConnectionFactoryBean.setRedeliveryPolicy(redeliveryPolicy);
        xaConnectionFactoryBean.setNonBlockingRedelivery(true);
        atomikos.setXaConnectionFactory(xaConnectionFactoryBean);
        return atomikos;
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setErrorHandler(new EHealthEventErrorHandler());
        factory.setMessageConverter(jacksonJmsMessageConverter());
        factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
        factory.setDestinationResolver(new EHealthDestinationResolver());
        factory.setSessionTransacted(true);
        return factory;
    }

    @Bean(autowire = Autowire.BY_TYPE)
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(atomikosConnectionFactoryBean());
        jmsTemplate.setDestinationResolver(new EHealthDestinationResolver());
        jmsTemplate.setSessionTransacted(true);
        return jmsTemplate;
    }
    ...
import org.springframework.jms.annotation.JmsListener;
import org.springframework.transaction.annotation.Transactional;

@Transactional
@JmsListener(destination = "XXX")
public void onMessageReceived(XXXEvent event) {
    throw new Exception();
}

Таким образом, вышеприведенное работает как положено, и сообщение доставляется с помощью стратегии ExponentialBackOff.

НО оно идет в сторону, когда получатель сообщения (onMessageReceived) вызывает метод класса, который отправляет сообщение в другую очередь в новой транзакции. Тогда сообщение не доставляется, если после совершения новой транзакции возникает исключение, например:

import org.springframework.transaction.annotation.Transactional;

public class FooClass {
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void createInNewTransaction() {
        sendMessageToAnotherQueue();
    }
}
import org.springframework.jms.annotation.JmsListener;
import org.springframework.transaction.annotation.Transactional;

@Transactional
@JmsListener(destination = "Foo")
public void onMessageReceived(FooEvent event) {
    fooClass.createInNewTransaction();
    throw new Exception();
}

В приведенной ниже трассировке стека видно, что org. apache .activemq. TransactionContext.synchronizations обнуляются при отправке сообщения в новой транзакции. TransactionContext.synchronizations содержит ActiveMQMessageConsumer, который используется для получения сообщения и необходим для повторной доставки после возникновения исключения. Когда это очищено, сообщение не доставляется: enter image description here

private void afterRollback() throws JMSException {
        if (synchronizations == null) {
            return;
        }
    ...
}

Это метод com.atomikos.datasource.xa.session.BranchEnlistedStateHandler.checkEnlistBeforeUse (), который обнаруживает, что контекст транзакции отличается и выдает исключение, которое перехватывается в SessionHandleState.notifyBeforeUse ():

TransactionContextStateHandler checkEnlistBeforeUse ( CompositeTransaction currentTx)
            throws InvalidSessionHandleStateException, UnexpectedTransactionContextException 
    {

        if ( currentTx == null || !currentTx.isSameTransaction ( ct ) ) {
            //OOPS! we are being used a different tx context than the one expected...

            //TODO check: what if subtransaction? Possible solution: ignore if serial_jta mode, error otherwise.

            String msg = "The connection/session object is already enlisted in a (different) transaction.";
            if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( msg );
            throw new UnexpectedTransactionContextException();
        } 

        //tx context is still the same -> no change in state required
        return null;
    }

Затем создается новый контекст и вызывается currentContext.checkEnlistBeforeUse (ct), который завершает очистку TransactionContext. синхронизации

В BranchEnlistedStateHandler.checkEnlistBeforeUse () есть комментарий: "// Проверка TODO: что если субтранзакция? Возможное решение: игнорировать, если используется serial_jta, иначе ошибка."

У меня есть субтранзакция и для com.atomikos.icatch.serial_jta_transactions установлено значение true. Мне просто не повезло столкнуться с чем-то, что еще не поддерживается?

Используемые версии: "org.springframework: spring-jms: 5.1.10.RELEASE", "com.atomikos: Transactions: 5.0.3" , "org. apache .activemq: activemq-client: 5.15.10"

Пытались перейти на новейшие версии, но ничего не изменилось.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...