Spring @Transactional JPA в процессоре JMSConsumer Camel - PullRequest
0 голосов
/ 02 ноября 2018

Я нахожусь в процессе обновления существующего Java SE Camel Application, чтобы использовать управляемый Spring JPA EntityManger с помощью внедрения @PersitentContext. В настоящее время он создается в нашем коде с использованием Persistence.createEntityManagerFactory и работает хорошо. Но нам бы хотелось иметь более гибкое и тестируемое приложение, используя внедрение этого entityManager, используя мощные возможности @Transactional aop, а также переключаться между локальными и глобальными транзакциями XA, просто используя различные @Configuration @ Profiles.

Моим первым тестом было тестирование транзакций xa с использованием ресурсов JMS и JPA с помощью Atomikos TransactionManager, и оно работало.

Теперь в этом приложении я хочу протестировать локальные транзакции, используя OpenJPA, Camel и JpaTransactionManager, т.е. без участия транзакции XA.

У меня есть модульные тесты, которые довольно хорошо работают без Camel, ткачество @Transactional выполняет работу по запуску транзакций с помощью jpaTransactionManager, и все в порядке.

Теперь, когда я пытаюсь интегрировать мои компоненты конфигурации в верблюжьи приложения, дела идут не очень хорошо.

У меня есть потребитель JMS, который запускает верблюжий процессор, где метод процесса помечен @Transactional.

Я думаю, что что-то неправильно настроил, потому что JMSConsumer запускает транзакцию, даже если я не настроил компонент JMS явно для транзакции (нет transacted () в маршруте, а не диспетчер транзакций, установленный в JMSConfiguration)

Вот точка останова, показывающая, что JmsConsumer пытается зарегистрироваться для синхронизации транзакций:

Daemon Thread [Camel (worker) thread #1 - JmsConsumer[<consumerName>]] (Suspended (breakpoint at line 175 in TransactionSynchronizationManager))
    TransactionSynchronizationManager.bindResource(Object, Object) line: 175
    DefaultJmsMessageListenerContainer(AbstractPollingMessageListenerContainer).doReceiveAndExecute(Object, Session, MessageConsumer, TransactionStatus) line: 313
    DefaultJmsMessageListenerContainer(AbstractPollingMessageListenerContainer).receiveAndExecute(Object, Session, MessageConsumer) line: 255
    DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener() line: 1168
    DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop() line: 1160
    DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run() line: 1057
    ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1142
    ThreadPoolExecutor$Worker.run() line: 617   
    Thread.run() line: 745 [local variables unavailable]

Я не вижу других ресурсов, связанных с TransactionSynchronisationManager.

Напротив, в моих модульных тестах все идет хорошо, и мы видим в стеке трассировку @Transactional, играющую свою роль, как и ожидалось:

Thread [main] (Suspended (breakpoint at line 175 in TransactionSynchronizationManager))
    TransactionSynchronizationManager.bindResource(Object, Object) line: 175
    JpaTransactionManager.doBegin(Object, TransactionDefinition) line: 406
    JpaTransactionManager(AbstractPlatformTransactionManager).getTransaction(TransactionDefinition) line: 377
    TransactionInterceptor(TransactionAspectSupport).createTransactionIfNecessary(PlatformTransactionManager, TransactionAttribute, String) line: 461
    TransactionInterceptor(TransactionAspectSupport).invokeWithinTransaction(Method, Class<?>, InvocationCallback) line: 277
    TransactionInterceptor.invoke(MethodInvocation) line: 96    
    CglibAopProxy$CglibMethodInvocation(ReflectiveMethodInvocation).proceed() line: 179
    CglibAopProxy$DynamicAdvisedInterceptor.intercept(Object, Method, Object[], MethodProxy) line: 671
    MyRepo$$EnhancerBySpringCGLIB$$5edadd7f.createRun(String) line: not available
    DataSourceConfigTest.testIt2() line: 38 
    ...

Очевидно, что здесь что-то неправильно настроено, кажется, что @Transactional полностью игнорируется или переопределяется какой-то волшебной автоконфигурацией SpringBoot. У кого-нибудь есть подсказка?

Вот моя конфигурация CamelConfiguration и My Data / JPA

Camel Config

@Configuration
public class CamelConfiguration {
    protected static Logger LOGGER = LoggerFactory.getLogger(CamelConfiguration.class);


    @Value("${broker.mqURL}")
    String mqURL;

    /**
     * The runId is injected from command Line arguments
     */
    @Value("${runId}")
    long runId;


    @Bean
    CamelContextConfiguration contextConfiguration() {
        return new CamelContextConfiguration() {
            @Override
            public void beforeApplicationStart(CamelContext context) {

                ((SpringCamelContext)context).setName("worker");

                DefaultShutdownStrategy shutdownStrategy = new DefaultShutdownStrategy();
                shutdownStrategy.setTimeUnit(TimeUnit.SECONDS);
                shutdownStrategy.setTimeout(5);
                context.setShutdownStrategy(shutdownStrategy);
            }

            @Override
            public void afterApplicationStart(CamelContext camelContext) {
                try {
                    LOGGER.info("Starting route dataspecProcessing." + runId);
                    camelContext.startAllRoutes();
                } catch (Exception e) {
                    LOGGER.error("Error during Camel post start",e);
                }
            }
        };
    }

    @Bean
    @Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
    public ActiveMQConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(mqURL);
        connectionFactory.setTrustAllPackages(true);
        return connectionFactory;
    }

    @Bean
    @Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
    public JmsConfiguration myJmsConfiguration(ActiveMQConnectionFactory jmsConnectionFactory,JpaTransactionManager jpaTransactionManager) {
        JmsConfiguration jmsConfiguration = new JmsConfiguration();
//      jmsConfiguration.setTransacted(true);
//      jmsConfiguration.setLazyCreateTransactionManager(false);
//      jmsConfiguration.setTransactionManager(jpaTransactionManager);
        jmsConfiguration.setConnectionFactory(jmsConnectionFactory);
        jmsConfiguration.setCacheLevelName("CACHE_CONSUMER");
        return jmsConfiguration;
    }

Jpa Config

@Configuration
@EnableTransactionManagement(mode=AdviceMode.ASPECTJ)
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class DataSourceConfig {

    @Bean
    public JpaTransactionManager jpaTransactionManager() {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setEntityManagerFactory(entityManagerFactory().getObject());
        transactionManager.setDataSource(dataSource());
        transactionManager.setJpaDialect(new OpenJpaDialect());
        return transactionManager;
    }

    @Bean
    public LocalContainerEntityManagerFactoryBean entityManagerFactory() {
        LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
        em.setPersistenceXmlLocation("META-INF/persistence.xml");
        em.setJpaProperties(additionalProperties());
        em.setDataSource(dataSource());
        em.setValidationMode(ValidationMode.NONE);
        JpaVendorAdapter vendorAdapter = new OpenJpaVendorAdapter();
        em.setJpaVendorAdapter(vendorAdapter);
        em.setJpaProperties(additionalProperties());

        return em;
    }

    @Bean
    public DataSource dataSource() {
        PGSimpleDataSource pgDataSource = new PGSimpleDataSource();
        pgDataSource.setUrl(...);
        pgDataSource.setUser(...);
        pgDataSource.setPassword(...);
        return pgDataSource;
    }

И код, который должен быть транзакционным

@Component
@EnableTransactionManagement(mode=AdviceMode.ASPECTJ)
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class MyCamelProcessor implements Processor {
...
@Transactional(propagation=Propagation.REQUIRES_NEW,transactionManager="jpaTransactionManager")
    public void process(Exchange exchange) throws Exception {
        ... calls to entitymanager here ...

Маршрут вызова процессора

@Component
public class DataSpecProcessingRoute extends SpringRouteBuilder {
....
public void configure() throws Exception {
...
from("jms:queue:thequeue"?concurrentConsumers=" + numConsumerThreads)
            .autoStartup(false)
        .shutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly)
                .routeId("dataspecProcessing." + runId)
                .process(initialisationProcessor)
                .process(processorWithTransactionnalNeeded)
.to("jms:queue:anotherqueue") ;

EDIT

Работает с программными транзакциями, а не с аннотацией @Transactional.

...