У меня есть приложение Spring Integration, развернутое на Jetty, которое выбирает сообщения (как XML, так и JSON) из IBM MQ, выполняет некоторые преобразования и сохраняет преобразованные сообщения в Oracle.
В настоящее время мы проводим нагрузочное тестированиеи производительность довольно низкая, примерно 15 сообщений в секунду отбирается из очереди в нашей среде NFT.Хотя наш инструмент мониторинга;AppD показывает, что для сохранения Oracle требуется всего ~ 20 мс, а на других этапах - почти ничего.
При сборке на наших локальных машинах мы можем видеть значительно более высокую пропускную способность (~ 100 сообщений в секунду), и я хочучтобы подтвердить это с помощью некоторой регистрации границ;то есть за скорость извлечения сообщений и скорость сохранения сообщений в БД.Это может помочь выявить узкие места или даже проблемы в сети.
Я использую управляемый сообщениями канал-адаптер
<jms:message-driven-channel-adapter id="jmsIn"
channel="routingChannel"
container="DefaultJmsListenerContainer" />
и соединения MQ;с 5-10 для параллелизма
public MQQueue createRequestQueue() throws IllegalStateException, JMSException {
return new MQQueue(env.getRequiredProperty(QUEUE_MANAGER), incomingQueue);
}
@Bean(name="DefaultJmsListenerContainer")
public DefaultMessageListenerContainer provideJmsListenerContainer() throws IllegalStateException, JMSException {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setTransactionManager(provideTransactionManager());
container.setConcurrency(jmsConcurrency);
container.setCacheLevel(jmsCacheLevel);
container.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
container.setSessionTransacted(true);
container.setDestination(createRequestQueue());
return container;
}
@Bean(name="MQConnectionFactory")
public ConnectionFactory connectionFactory() {
if (factory == null) {
factory = new MQConnectionFactory();
try {
factory.setConnectionNameList(env.getRequiredProperty(HOST));
factory.setPort(Integer.parseInt(env.getRequiredProperty(PORT)));
factory.setQueueManager(env.getRequiredProperty(QUEUE_MANAGER));
factory.setChannel(env.getRequiredProperty(CHANNEL));
factory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
factory.setSSLCipherSuite(env.getRequiredProperty(SSL_CIPHER_SUITE));
factory.setStringProperty(WMQConstants.USERID, env.getRequiredProperty(QUEUE_USERID));
factory.setStringProperty(WMQConstants.PASSWORD, env.getRequiredProperty(QUEUE_PASSWORD));
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
return factory;
}
@Bean(name = "jmsTransactionManager")
public JmsTransactionManager provideTransactionManager() {
return new JmsTransactionManager(connectionFactory());
}
@Bean(name="jdbcTransactionManager")
public DataSourceTransactionManager jdbcTransactionManager() {
DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();
dataSourceTransactionManager.setDataSource(dataSource());
return dataSourceTransactionManager;
}
@Bean
public DataSource dataSource() {
final JndiDataSourceLookup dsLookup = new JndiDataSourceLookup();
dsLookup.setResourceRef(true);
DataSource dataSource = dsLookup.getDataSource("jdbc/pxxds");
return dataSource;
}
Для части персистентности Oracle я использую ServiceActivator для вызова JDBCTemplace для персистентности полезной нагрузки (модель записи) и ставлю в очередь сообщения в QueueChannel и использую агрегатор дляПакетная устойчивость к гибернации (модель чтения)
Когда я вижу блоги, подобные приведенному ниже, обрабатывающие миллион транзакций в секунду, это довольно наглядно, и я хочу найти основную причину
https://fr.slideshare.net/joshlong/dynamic-routing-at-1-million-messages-per-second-with-spring-integration
Я также не уверен, что делать, если мне нужно прочитать 10 сообщений из очереди, а затем сохранить в пакетном режиме, как должен быть сконфигурирован адаптер управляемых сообщениями каналов и при этом обеспечитьчто JMS Transaction Management работает вместе с JDBC Transaction Management.
Я не смог найти хорошо объясненную статью об управлении транзакциями в Spring Integration при использовании 2 менеджеров транзакций, как в моем случае.И требование с нашей стороны - достичь нулевой потери данных , поэтому мне важно понять.
Итак, 2 вопроса:
- как настроить границуведение журнала для текущего сценария
- Есть ли какие-либо несоответствия, которые очевидны из конфигурации выше
Cheers Kris