У нас есть отдельное Java-приложение, использующее сторонний инструмент для управления пулами соединений, которое долгое время работало для нас при настройке v6_client + v6_server.
Теперь мы пытаемся перейти с v6 на v9 (да, мы опоздали на вечеринку .....) и обнаружили, что соединение v9_client с соединением v6_server постоянно прерывается, что означает:
- Сокет, созданный XAQueueConnectionFactory # createXAConnection (), всегда закрывается немедленнои созданный XAConnection, кажется, не знает об этом.
- Из-за вышеупомянутого закрытия сокета XASession, созданный из XAConnection.createXASession (), всегда создает новый сокет и закрывает сокет после XASession.close ().
Мы прошли черезполный список настраиваемых параметров для v9_client (столбец XAQCF в https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.0.0/com.ibm.mq.ref.dev.doc/q111800_.html) и только два потенциальных новых конфига, которые мы не использовали в v6_client, SHARECONVALLOWED и PROVIDERVERSION . К сожалению, нетпомогает нам ..... В частности:
- Мы попробовали setShareConvAllowed (WMQConstants.WMQ_SHARE_CONV_ALLOWED_ [YES / NO]) Учитывая, что на стороне v6_server нет свойства SHARECNV, что неудивительно.
- Мы попробовали «Миграция / Ограниченный / Нормальный режим» с помощью setProviderVersion («[6/7/8») ([7/8] выбрасывает исключения, как и ожидалось ....).
ПростоИнтересно, если бы кто-то еще имел подобный опыт и мог бы поделиться некоторыми идеями. Мы попробовали v9_server + v9_client и не видели подобной проблемы, так что это также может быть нашим возможным решением .....
Кстати, WMQ размещен на linux (RedHat), и мы используем только продукты MQXAQueueConnectionFactory на стороне клиента (класс ibm mq для jms).
Спасибо.
Дополнительные сведения / обновления.
[update-1]
- [настройка playgrond]
банок v9_client:
javax.jms-api-2.0.jar
com.ibm.mq.allclient(-9.0.0.[1/5]).jar
банок v6_client: в дополнение к банкам v9_client добавлены следующие баночки в classpath eclipse
com.ibm.dhbcore-1.0.jar
com.ibm.mq.pcf-6.0.3.jar
com.ibm.mqjms-6.0.2.2.jar
com.ibm.mq-6.0.2.2.jar
com.ibm.mqetclient-6.0.2.2.jar
connector.jar
jta-1.1.jar
Код тестирования -одиночный поток:
import javax.jms.*;
import com.ibm.mq.jms.*;
import com.ibm.msg.client.wmq.WMQConstants;
public class MQSeries_simpleAuditQ {
private static String queueManager = "QM.RCTQ.ALL.01";
private static String host = "localhost";
private static int port = 26005;
public static void main(String[] args) throws JMSException {
MQXAQueueConnectionFactory queueFactory= new MQXAQueueConnectionFactory();
System.out.println("\n\n\n*******************\nqueueFactory implementation version: " +
queueFactory.getClass().getPackage().getImplementationVersion() + "*****************\n\n\n");
queueFactory.setHostName(host);
queueFactory.setPort(port);
queueFactory.setQueueManager(queueManager);
queueFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
if (queueFactory.getClass().getPackage().getImplementationVersion().split("\\.")[0].equals("9")) {
queueFactory.setProviderVersion("6");
//queueFactory.setShareConvAllowed(WMQConstants.WMQ_SHARE_CONV_ALLOWED_YES);
}
XASession xaSession;
javax.jms.QueueConnection xaQueueConnection;
try {
// Obtain a QueueConnection
System.out.println("Creating Connection...");
xaQueueConnection = (QueueConnection)queueFactory.createXAConnection(" ", "");
xaQueueConnection.start();
for (int counter=0; counter<2; counter++) {
try {
xaSession = ((XAConnection)xaQueueConnection).createXASession();
xaSession.close();
} catch (Exception ex) {
System.out.println(ex);
}
}
System.out.println("Closing connection.... ");
xaQueueConnection.close();
} catch (Exception e) {
System.out.println("Error processing " + e.getMessage());
}
}
}
- [наблюдения] v6_client создал и закрыл только один сокет, а v9_client (оба 9.0.0. [1/5]):
- сокет создан и закрыт сразу после
xaQueueConnection = (QueueConnection)queueFactory.createXAConnection(" ", "");
- во внутреннем цикле for, сокет создан сразу после
xaSession = ((XAConnection)xaQueueConnection).createXASession();
и закрыт после xaSession.close();
Наивно я ожидал, что сокет останетсяоткрывать до xaQueueConnection.close()
.
[update-2] Используя queueFactory.setProviderVersion("9");
и queueFactory.setShareConvAllowed(WMQConstants.WMQ_SHARE_CONV_ALLOWED_YES);
для v9_server + v9_client, мы не видим ту же самую проблему закрытия константного сокета в v6_server + v9_client, котораяхорошая новость
[update-3] MCAUSER
on для всех каналов SVRCONN
на v6_server.То же самое на v9_server (у которого нет той же проблемы закрытия сокета при подключении с тем же v9_client).
display channel (SYSTEM.ADMIN.SVRCONN)
MCAUSER(mqm)
display channel (SYSTEM.AUTO.SVRCONN)
MCAUSER( )
display channel (SYSTEM.DEF.SVRCONN)
MCAUSER( )
[update-4]
Я попытался установить MCAUSER () к mqm
, затем используя
(пусто) и mqm
со стороны клиента, оба могут создавать соединения, но все равно видят одно и то же неожиданное закрытие сокета, используя v9_client + v6_user.После обновления MCAUSER () я всегда добавлял refresh security
и перезапускал qmgr.
Я также пытался установить пустую системную переменную в eclipse перед созданием соединения с пустым пользователем, тоже не помогло.
[update-5]
Ограничение нашего обсуждения v9_client + v9_server.Приведенный ниже код асинхронного тестирования генерирует тонну запросов на создание / закрытие xasession, используя ограниченное количество существующих соединений.Используя SHARECNV (1), мы также получим недоступное большое количество TIME_WAIT, но использование более 1 SHARECNV (например, 10) может привести к дополнительному снижению производительности ......
Асинхронный код тестирования
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import javax.jms.*;
import com.ibm.mq.jms.*;
import com.ibm.msg.client.wmq.WMQConstants;
public class MQSeries_simpleAuditQ_Async_v9 {
private static String queueManager = "QM.ALPQ.ALL.01";
private static int port = 26010;
private static String host = "localhost";
private static int connCount = 20;
private static int amp = 100;
private static ExecutorService amplifier = Executors.newFixedThreadPool(amp);
public static void main(String[] args) throws JMSException {
MQXAQueueConnectionFactory queueFactory= new MQXAQueueConnectionFactory();
System.out.println("\n\n\n*******************\nqueueFactory implementation version: " +
queueFactory.getClass().getPackage().getImplementationVersion() + "*****************\n\n\n");
queueFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
if (queueFactory.getClass().getPackage().getImplementationVersion().split("\\.")[0].equals("9")) {
queueFactory.setProviderVersion("9");
queueFactory.setShareConvAllowed(WMQConstants.WMQ_SHARE_CONV_ALLOWED_YES);
}
queueFactory.setHostName(host);
queueFactory.setPort(port);
queueFactory.setQueueManager(queueManager);
//queueFactory.setChannel("");
ArrayList<QueueConnection> xaQueueConnections = new ArrayList<QueueConnection>();
try {
// Obtain a QueueConnection
System.out.println("Creating Connection...");
//System.setProperty("user.name", "mqm");
//System.out.println("system username: " + System.getProperty("user.name"));
for (int ct=0; ct<connCount; ct++) {
// xaQueueConnection instance of MQXAQueueConnection
QueueConnection xaQueueConnection = (QueueConnection)queueFactory.createXAConnection(" ", "");
xaQueueConnection.start();
xaQueueConnections.add(xaQueueConnection);
}
ArrayList<Double> totalElapsedTimeRecord = new ArrayList<Double>();
ArrayList<FutureTask<Double>> taskBuffer = new ArrayList<FutureTask<Double>>();
for (int loop=0; loop <= 10; loop++) {
try {
for (int i=0; i<amp; i++) {
int idx = (int)(Math.random()*((connCount)));
System.out.println("Using connection: " + idx);
FutureTask<Double> xaSessionPoker = new FutureTask<Double>(new XASessionPoker(xaQueueConnections.get(idx)));
amplifier.submit(xaSessionPoker);
taskBuffer.add(xaSessionPoker);
}
System.out.println("loop " + loop + " completed");
} catch (Exception ex) {
System.out.println(ex);
}
}
for (FutureTask<Double> xaSessionPoker : taskBuffer) {
totalElapsedTimeRecord.add(xaSessionPoker.get());
}
System.out.println("Average xaSession poking time: " + calcAverage(totalElapsedTimeRecord));
System.out.println("Closing connections.... ");
for (QueueConnection xaQueueConnection : xaQueueConnections) {
xaQueueConnection.close();
}
} catch (Exception e) {
System.out.println("Error processing " + e.getMessage());
}
amplifier.shutdown();
}
private static double calcAverage(ArrayList<Double> myArr) {
double sum = 0;
for (Double val : myArr) {
sum += val;
}
return sum/myArr.size();
}
// create and close session through QueueConnection object passed in.
private static class XASessionPoker implements Callable<Double> {
// conn instance of MQXAQueueConnection. ref. QueueProviderService
private QueueConnection conn;
XASessionPoker(QueueConnection conn) {
this.conn = conn;
}
@Override
public Double call() throws Exception {
XASession xaSession;
double elapsed = 0;
try {
final long start = System.currentTimeMillis();
// ref. DualSessionWrapper
// xaSession instance of MQXAQueueSession
xaSession = ((XAConnection) conn).createXASession();
xaSession.close();
final long end = System.currentTimeMillis();
elapsed = (end - start) / 1000.0;
} catch (Exception e) {
// TODO Auto-generated catch block
System.out.println(e);
}
return elapsed;
}
}
}