Spring Integration Boundary Logging - PullRequest
       62

Spring Integration Boundary Logging

0 голосов
/ 13 июня 2018

У меня есть приложение Spring Integration, развернутое на Jetty, которое выбирает сообщения (как XML, так и JSON) из IBM MQ, выполняет некоторые преобразования и сохраняет преобразованные сообщения в Oracle.

В настоящее время мы проводим нагрузочное тестированиеи производительность довольно низкая, примерно 15 сообщений в секунду отбирается из очереди в нашей среде NFT.Хотя наш инструмент мониторинга;AppD показывает, что для сохранения Oracle требуется всего ~ 20 мс, а на других этапах - почти ничего.

При сборке на наших локальных машинах мы можем видеть значительно более высокую пропускную способность (~ 100 сообщений в секунду), и я хочучтобы подтвердить это с помощью некоторой регистрации границ;то есть за скорость извлечения сообщений и скорость сохранения сообщений в БД.Это может помочь выявить узкие места или даже проблемы в сети.

Я использую управляемый сообщениями канал-адаптер

<jms:message-driven-channel-adapter id="jmsIn"
        channel="routingChannel"
        container="DefaultJmsListenerContainer" />

и соединения MQ;с 5-10 для параллелизма

    public MQQueue createRequestQueue() throws IllegalStateException, JMSException {
    return new MQQueue(env.getRequiredProperty(QUEUE_MANAGER), incomingQueue);
}

    @Bean(name="DefaultJmsListenerContainer")
 public DefaultMessageListenerContainer provideJmsListenerContainer() throws IllegalStateException, JMSException {
     DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
     container.setConnectionFactory(connectionFactory());     
     container.setTransactionManager(provideTransactionManager());
     container.setConcurrency(jmsConcurrency);
     container.setCacheLevel(jmsCacheLevel);
     container.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
     container.setSessionTransacted(true);
     container.setDestination(createRequestQueue());

     return container;
 }

    @Bean(name="MQConnectionFactory")
public ConnectionFactory connectionFactory() {

    if (factory == null) {
        factory = new MQConnectionFactory();
        try {
            factory.setConnectionNameList(env.getRequiredProperty(HOST));
            factory.setPort(Integer.parseInt(env.getRequiredProperty(PORT)));            
            factory.setQueueManager(env.getRequiredProperty(QUEUE_MANAGER));
            factory.setChannel(env.getRequiredProperty(CHANNEL));
            factory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            factory.setSSLCipherSuite(env.getRequiredProperty(SSL_CIPHER_SUITE));
            factory.setStringProperty(WMQConstants.USERID, env.getRequiredProperty(QUEUE_USERID));
            factory.setStringProperty(WMQConstants.PASSWORD, env.getRequiredProperty(QUEUE_PASSWORD));

        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }
    return factory;
}


     @Bean(name = "jmsTransactionManager")
 public JmsTransactionManager provideTransactionManager() {      
     return new JmsTransactionManager(connectionFactory());
 }

    @Bean(name="jdbcTransactionManager")
public DataSourceTransactionManager jdbcTransactionManager() {
    DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();
    dataSourceTransactionManager.setDataSource(dataSource());
    return dataSourceTransactionManager;
}

@Bean
public DataSource dataSource() {
    final JndiDataSourceLookup dsLookup = new JndiDataSourceLookup();
    dsLookup.setResourceRef(true);        
    DataSource  dataSource = dsLookup.getDataSource("jdbc/pxxds");        
    return dataSource;
}

Для части персистентности Oracle я использую ServiceActivator для вызова JDBCTemplace для персистентности полезной нагрузки (модель записи) и ставлю в очередь сообщения в QueueChannel и использую агрегатор дляПакетная устойчивость к гибернации (модель чтения)

Когда я вижу блоги, подобные приведенному ниже, обрабатывающие миллион транзакций в секунду, это довольно наглядно, и я хочу найти основную причину

https://fr.slideshare.net/joshlong/dynamic-routing-at-1-million-messages-per-second-with-spring-integration

Я также не уверен, что делать, если мне нужно прочитать 10 сообщений из очереди, а затем сохранить в пакетном режиме, как должен быть сконфигурирован адаптер управляемых сообщениями каналов и при этом обеспечитьчто JMS Transaction Management работает вместе с JDBC Transaction Management.

Я не смог найти хорошо объясненную статью об управлении транзакциями в Spring Integration при использовании 2 менеджеров транзакций, как в моем случае.И требование с нашей стороны - достичь нулевой потери данных , поэтому мне важно понять.

Итак, 2 вопроса:

  1. как настроить границуведение журнала для текущего сценария
  2. Есть ли какие-либо несоответствия, которые очевидны из конфигурации выше

Cheers Kris

1 Ответ

0 голосов
/ 13 июня 2018

Возможно, проблема в том, что вы упоминаете QueueChannel.Этот действительно основан на каком-то опросчике, и если он настроен на что-то вроде fixedDelay = 1000, так что это будет узким местом.Другой вариант для этого опроса - это maxMessagesPerPoll = -1.

. Я не уверен, какой у вас boundary logging.Никогда не слышал такого термина, но вы действительно можете включить уровень ведения журнала DEBUG для org.springframework.integration, и в журналах вы увидите, как сообщения проходят через компоненты Spring Integration.

Для DataSourceTransactionManager и JmsTransactionManager комбинацию, которую мы всегда рекомендуем прочитать в статье Дэйва Сайера о статье 1PC Distributed Transactions: https://www.javaworld.com/article/2077963/open-source-tools/distributed-transactions-in-spring--with-and-without-xa.html. Также в данных Spring уже есть ChainedTransactionManager.

...