Я нахожусь в процессе обновления существующего Java SE Camel Application, чтобы использовать управляемый Spring JPA EntityManger с помощью внедрения @PersitentContext. В настоящее время он создается в нашем коде с использованием Persistence.createEntityManagerFactory и работает хорошо. Но нам бы хотелось иметь более гибкое и тестируемое приложение, используя внедрение этого entityManager, используя мощные возможности @Transactional aop, а также переключаться между локальными и глобальными транзакциями XA, просто используя различные @Configuration @ Profiles.
Моим первым тестом было тестирование транзакций xa с использованием ресурсов JMS и JPA с помощью Atomikos TransactionManager, и оно работало.
Теперь в этом приложении я хочу протестировать локальные транзакции, используя OpenJPA, Camel и JpaTransactionManager, т.е. без участия транзакции XA.
У меня есть модульные тесты, которые довольно хорошо работают без Camel, ткачество @Transactional выполняет работу по запуску транзакций с помощью jpaTransactionManager, и все в порядке.
Теперь, когда я пытаюсь интегрировать мои компоненты конфигурации в верблюжьи приложения, дела идут не очень хорошо.
У меня есть потребитель JMS, который запускает верблюжий процессор, где метод процесса помечен @Transactional.
Я думаю, что что-то неправильно настроил, потому что JMSConsumer запускает транзакцию, даже если я не настроил компонент JMS явно для транзакции (нет transacted () в маршруте, а не диспетчер транзакций, установленный в JMSConfiguration)
Вот точка останова, показывающая, что JmsConsumer пытается зарегистрироваться для синхронизации транзакций:
Daemon Thread [Camel (worker) thread #1 - JmsConsumer[<consumerName>]] (Suspended (breakpoint at line 175 in TransactionSynchronizationManager))
TransactionSynchronizationManager.bindResource(Object, Object) line: 175
DefaultJmsMessageListenerContainer(AbstractPollingMessageListenerContainer).doReceiveAndExecute(Object, Session, MessageConsumer, TransactionStatus) line: 313
DefaultJmsMessageListenerContainer(AbstractPollingMessageListenerContainer).receiveAndExecute(Object, Session, MessageConsumer) line: 255
DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener() line: 1168
DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop() line: 1160
DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run() line: 1057
ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1142
ThreadPoolExecutor$Worker.run() line: 617
Thread.run() line: 745 [local variables unavailable]
Я не вижу других ресурсов, связанных с TransactionSynchronisationManager.
Напротив, в моих модульных тестах все идет хорошо, и мы видим в стеке трассировку @Transactional, играющую свою роль, как и ожидалось:
Thread [main] (Suspended (breakpoint at line 175 in TransactionSynchronizationManager))
TransactionSynchronizationManager.bindResource(Object, Object) line: 175
JpaTransactionManager.doBegin(Object, TransactionDefinition) line: 406
JpaTransactionManager(AbstractPlatformTransactionManager).getTransaction(TransactionDefinition) line: 377
TransactionInterceptor(TransactionAspectSupport).createTransactionIfNecessary(PlatformTransactionManager, TransactionAttribute, String) line: 461
TransactionInterceptor(TransactionAspectSupport).invokeWithinTransaction(Method, Class<?>, InvocationCallback) line: 277
TransactionInterceptor.invoke(MethodInvocation) line: 96
CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 179
CglibAopProxy$DynamicAdvisedInterceptor.intercept(Object, Method, Object[], MethodProxy) line: 671
MyRepo$$EnhancerBySpringCGLIB$$5edadd7f.createRun(String) line: not available
DataSourceConfigTest.testIt2() line: 38
...
Очевидно, что здесь что-то неправильно настроено, кажется, что @Transactional полностью игнорируется или переопределяется какой-то волшебной автоконфигурацией SpringBoot. У кого-нибудь есть подсказка?
Вот моя конфигурация CamelConfiguration и My Data / JPA
Camel Config
@Configuration
public class CamelConfiguration {
protected static Logger LOGGER = LoggerFactory.getLogger(CamelConfiguration.class);
@Value("${broker.mqURL}")
String mqURL;
/**
* The runId is injected from command Line arguments
*/
@Value("${runId}")
long runId;
@Bean
CamelContextConfiguration contextConfiguration() {
return new CamelContextConfiguration() {
@Override
public void beforeApplicationStart(CamelContext context) {
((SpringCamelContext)context).setName("worker");
DefaultShutdownStrategy shutdownStrategy = new DefaultShutdownStrategy();
shutdownStrategy.setTimeUnit(TimeUnit.SECONDS);
shutdownStrategy.setTimeout(5);
context.setShutdownStrategy(shutdownStrategy);
}
@Override
public void afterApplicationStart(CamelContext camelContext) {
try {
LOGGER.info("Starting route dataspecProcessing." + runId);
camelContext.startAllRoutes();
} catch (Exception e) {
LOGGER.error("Error during Camel post start",e);
}
}
};
}
@Bean
@Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
public ActiveMQConnectionFactory jmsConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(mqURL);
connectionFactory.setTrustAllPackages(true);
return connectionFactory;
}
@Bean
@Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
public JmsConfiguration myJmsConfiguration(ActiveMQConnectionFactory jmsConnectionFactory,JpaTransactionManager jpaTransactionManager) {
JmsConfiguration jmsConfiguration = new JmsConfiguration();
// jmsConfiguration.setTransacted(true);
// jmsConfiguration.setLazyCreateTransactionManager(false);
// jmsConfiguration.setTransactionManager(jpaTransactionManager);
jmsConfiguration.setConnectionFactory(jmsConnectionFactory);
jmsConfiguration.setCacheLevelName("CACHE_CONSUMER");
return jmsConfiguration;
}
Jpa Config
@Configuration
@EnableTransactionManagement(mode=AdviceMode.ASPECTJ)
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class DataSourceConfig {
@Bean
public JpaTransactionManager jpaTransactionManager() {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setEntityManagerFactory(entityManagerFactory().getObject());
transactionManager.setDataSource(dataSource());
transactionManager.setJpaDialect(new OpenJpaDialect());
return transactionManager;
}
@Bean
public LocalContainerEntityManagerFactoryBean entityManagerFactory() {
LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
em.setPersistenceXmlLocation("META-INF/persistence.xml");
em.setJpaProperties(additionalProperties());
em.setDataSource(dataSource());
em.setValidationMode(ValidationMode.NONE);
JpaVendorAdapter vendorAdapter = new OpenJpaVendorAdapter();
em.setJpaVendorAdapter(vendorAdapter);
em.setJpaProperties(additionalProperties());
return em;
}
@Bean
public DataSource dataSource() {
PGSimpleDataSource pgDataSource = new PGSimpleDataSource();
pgDataSource.setUrl(...);
pgDataSource.setUser(...);
pgDataSource.setPassword(...);
return pgDataSource;
}
И код, который должен быть транзакционным
@Component
@EnableTransactionManagement(mode=AdviceMode.ASPECTJ)
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class MyCamelProcessor implements Processor {
...
@Transactional(propagation=Propagation.REQUIRES_NEW,transactionManager="jpaTransactionManager")
public void process(Exchange exchange) throws Exception {
... calls to entitymanager here ...
Маршрут вызова процессора
@Component
public class DataSpecProcessingRoute extends SpringRouteBuilder {
....
public void configure() throws Exception {
...
from("jms:queue:thequeue"?concurrentConsumers=" + numConsumerThreads)
.autoStartup(false)
.shutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly)
.routeId("dataspecProcessing." + runId)
.process(initialisationProcessor)
.process(processorWithTransactionnalNeeded)
.to("jms:queue:anotherqueue") ;
EDIT
Работает с программными транзакциями, а не с аннотацией @Transactional.