У меня есть ActiveMQ, где я настроил Redelivery на стороне клиента. С простым потребителем он работает, как и ожидалось, с приведенными ниже конфигурациями:
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.boot.jta.atomikos.AtomikosConnectionFactoryBean;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
...
@Bean
public ConnectionFactory atomikosConnectionFactoryBean() {
String mqUrl = System.getenv("MQ_URL");
AtomikosConnectionFactoryBean atomikos = new AtomikosConnectionFactoryBean();
atomikos.setLocalTransactionMode(false);
atomikos.setMaxPoolSize(10);
atomikos.setUniqueResourceName("QUEUE_BROKER");
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(4);
redeliveryPolicy.setBackOffMultiplier(10);
redeliveryPolicy.setRedeliveryDelay(1000L);
redeliveryPolicy.setInitialRedeliveryDelay(1000L);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveryDelay(86400000L);
ActiveMQXAConnectionFactory xaConnectionFactoryBean = new ActiveMQXAConnectionFactory(System.getenv("MQ_USERNAME"), System.getenv("MQ_PASSWORD"), mqUrl);
xaConnectionFactoryBean.setRedeliveryPolicy(redeliveryPolicy);
xaConnectionFactoryBean.setNonBlockingRedelivery(true);
atomikos.setXaConnectionFactory(xaConnectionFactoryBean);
return atomikos;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setErrorHandler(new EHealthEventErrorHandler());
factory.setMessageConverter(jacksonJmsMessageConverter());
factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
factory.setDestinationResolver(new EHealthDestinationResolver());
factory.setSessionTransacted(true);
return factory;
}
@Bean(autowire = Autowire.BY_TYPE)
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(atomikosConnectionFactoryBean());
jmsTemplate.setDestinationResolver(new EHealthDestinationResolver());
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
...
import org.springframework.jms.annotation.JmsListener;
import org.springframework.transaction.annotation.Transactional;
@Transactional
@JmsListener(destination = "XXX")
public void onMessageReceived(XXXEvent event) {
throw new Exception();
}
Таким образом, вышеприведенное работает как положено, и сообщение доставляется с помощью стратегии ExponentialBackOff.
НО оно идет в сторону, когда получатель сообщения (onMessageReceived) вызывает метод класса, который отправляет сообщение в другую очередь в новой транзакции. Тогда сообщение не доставляется, если после совершения новой транзакции возникает исключение, например:
import org.springframework.transaction.annotation.Transactional;
public class FooClass {
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void createInNewTransaction() {
sendMessageToAnotherQueue();
}
}
import org.springframework.jms.annotation.JmsListener;
import org.springframework.transaction.annotation.Transactional;
@Transactional
@JmsListener(destination = "Foo")
public void onMessageReceived(FooEvent event) {
fooClass.createInNewTransaction();
throw new Exception();
}
В приведенной ниже трассировке стека видно, что org. apache .activemq. TransactionContext.synchronizations обнуляются при отправке сообщения в новой транзакции. TransactionContext.synchronizations содержит ActiveMQMessageConsumer, который используется для получения сообщения и необходим для повторной доставки после возникновения исключения. Когда это очищено, сообщение не доставляется:
private void afterRollback() throws JMSException {
if (synchronizations == null) {
return;
}
...
}
Это метод com.atomikos.datasource.xa.session.BranchEnlistedStateHandler.checkEnlistBeforeUse (), который обнаруживает, что контекст транзакции отличается и выдает исключение, которое перехватывается в SessionHandleState.notifyBeforeUse ():
TransactionContextStateHandler checkEnlistBeforeUse ( CompositeTransaction currentTx)
throws InvalidSessionHandleStateException, UnexpectedTransactionContextException
{
if ( currentTx == null || !currentTx.isSameTransaction ( ct ) ) {
//OOPS! we are being used a different tx context than the one expected...
//TODO check: what if subtransaction? Possible solution: ignore if serial_jta mode, error otherwise.
String msg = "The connection/session object is already enlisted in a (different) transaction.";
if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( msg );
throw new UnexpectedTransactionContextException();
}
//tx context is still the same -> no change in state required
return null;
}
Затем создается новый контекст и вызывается currentContext.checkEnlistBeforeUse (ct), который завершает очистку TransactionContext. синхронизации
В BranchEnlistedStateHandler.checkEnlistBeforeUse () есть комментарий: "// Проверка TODO: что если субтранзакция? Возможное решение: игнорировать, если используется serial_jta, иначе ошибка."
У меня есть субтранзакция и для com.atomikos.icatch.serial_jta_transactions установлено значение true. Мне просто не повезло столкнуться с чем-то, что еще не поддерживается?
Используемые версии: "org.springframework: spring-jms: 5.1.10.RELEASE", "com.atomikos: Transactions: 5.0.3" , "org. apache .activemq: activemq-client: 5.15.10"
Пытались перейти на новейшие версии, но ничего не изменилось.