Производительность JMS и RemoteTCPConnection.receive - PullRequest
0 голосов
/ 03 марта 2020

После распараллеливания пользовательских потоков JMS я не получил желаемого прироста производительности.
В jvisualvm я вижу следующие jms-материалы, чтобы быть «победителями» в колонке CPU с собственным временем.

com.ibm.mq.jmqi.remote.impl.RemoteTCPConnection.receive()

Вот полная трассировка стека того, что фактически делают потоки.

"RcvThread: com.ibm.mq.jmqi.remote.impl.RemoteTCPConnection@9167029[qmid=MWTEST53_2019-06-10_12.32.59,fap=12,channel=MA                  ,ccsid=1208,sharecnv=10,hbint=300,peer=MWTEST/162.11.56.86(1453),localport=61965,ssl=no,hConns=0,LastDataSend=1583310588163 (16ms ago),LastDataRecv=1583310588163 (16ms ago),]" - Thread t@651
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(Unknown Source)
        at java.net.SocketInputStream.read(Unknown Source)
        at java.net.SocketInputStream.read(Unknown Source)
        at com.ibm.mq.jmqi.remote.impl.RemoteTCPConnection.receive(RemoteTCPConnection.java:1535)
        at com.ibm.mq.jmqi.remote.impl.RemoteRcvThread.receiveBuffer(RemoteRcvThread.java:794)
        at com.ibm.mq.jmqi.remote.impl.RemoteRcvThread.receiveOneTSH(RemoteRcvThread.java:757)
        at com.ibm.mq.jmqi.remote.impl.RemoteRcvThread.run(RemoteRcvThread.java:150)
        at java.lang.Thread.run(Unknown Source)

   Locked ownable synchronizers:
        - None

"jms_reader_thread_9" - Thread t@63
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <711f08> (a com.ibm.mq.jmqi.remote.impl.RemoteProxyQueue$GetterEventMonitor)
        at com.ibm.mq.jmqi.remote.impl.RemoteProxyQueue.proxyMQGET(RemoteProxyQueue.java:2492)
        at com.ibm.mq.jmqi.remote.api.RemoteFAP.jmqiGetInternalWithRecon(RemoteFAP.java:6536)
        at com.ibm.mq.jmqi.remote.api.RemoteFAP.jmqiGetInternal(RemoteFAP.java:6432)
        at com.ibm.mq.jmqi.internal.JmqiTools.getMessage(JmqiTools.java:1259)
        at com.ibm.mq.jmqi.remote.api.RemoteFAP.jmqiGet(RemoteFAP.java:6395)
        at com.ibm.mq.ese.jmqi.InterceptedJmqiImpl.jmqiGet(InterceptedJmqiImpl.java:910)
        at com.ibm.mq.ese.jmqi.ESEJMQI.jmqiGet(ESEJMQI.java:362)
        at com.ibm.mq.MQDestination.internalGetMessage(MQDestination.java:1012)
        - locked <1a7d0fa> (a com.ibm.mq.MQQueue)
        at com.ibm.mq.MQDestination.getInt(MQDestination.java:585)
        - locked <1a7d0fa> (a com.ibm.mq.MQQueue)
        at com.ibm.mq.MQDestination.get(MQDestination.java:456)
        at com.company.app.connection.MqConnection.nextRecordObject(MqConnection.java:351)
        at com.company.app.source.AppMqReader.next(AppMqReader.java:37)
        at com.company.app.source.AppJMSReadConnector.delegateNext(AppJMSReadConnector.java:202)
        at com.company.app.source.LegacyReadConnector.readNextWithRetry(LegacyReadConnector.java:114)
        at com.company.app.source.LegacyReadConnector.processNext(LegacyReadConnector.java:68)
        at com.company.app.source.AppJMSReadConnector.processNext(AppJMSReadConnector.java:435)
        at com.company.app.source.AppReadConnector.next(AppReadConnector.java:131)
        at com.company.app.source.AppReadConnector.next(AppReadConnector.java:114)
        at com.company.app.configuration.jaxb.ReadPipeline.run(ReadPipeline.java:262)
        at com.company.app.configuration.jaxb.AppAdaptor.run(AppAdaptor.java:684)
        at com.company.app.configuration.jaxb.RouteService.runOneIteration(RouteService.java:78)
        at com.google.common.util.concurrent.AbstractScheduledService$ServiceDelegate$Task.run(AbstractScheduledService.java:195)
        at com.google.common.util.concurrent.AbstractScheduledService$CustomScheduler$ReschedulableCallable.call(AbstractScheduledService.java:482)
        at com.google.common.util.concurrent.AbstractScheduledService$CustomScheduler$ReschedulableCallable.call(AbstractScheduledService.java:448)
        at com.google.common.util.concurrent.Callables$3.call(Callables.java:89)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

   Locked ownable synchronizers:
        - locked <cc8421> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)

        - locked <15571ee> (a java.util.concurrent.ThreadPoolExecutor$Worker)

Поток ждет! Почти все из 10 потоков находятся в состоянии TIMED_WAITING. Вот фрагмент кода метода receive() logi c

synchronized (this.getterEventMonitor) {
    if ((this.status & 0x4) == 0) {
        if (waitInterval > 0) {
            this.getterEventMonitor.wait(waitInterval);
        }
        else {

            this.getterEventMonitor.wait();
        }

    }
} 

Статус 0x4 должен быть ST_GETTER_SIGNALLED.

Вопросы: что такое логика c позади ожидания состояния ST_GETTER_SIGNALLED, что на самом деле делает поток и как я могу сделать это быстрее?

Ответы [ 2 ]

0 голосов
/ 09 марта 2020

Вам необходимо понять основы производительности ... Часть работы либо выполняется (с использованием ЦП), либо ожидает - без использования ЦП.

Если нет работы, то поток будет быть в ожидании. Внутренне он может просыпаться, периодически проверять состояние и go снова переходить в спящий режим. В этом случае «горячие» процедуры, использующие ЦП, будут связаны с синхронизацией. Обычно приложение MQ выполняет синхронизированное ожидание - MQGET WAIT в течение 10 секунд - поэтому вы увидите, что «находятся в состоянии TIMED_WAITING.»


Если вы видите, что потоки находятся в состоянии ожидания - тогда вам нужно Узнайте, почему сообщения не доставляются в ваши темы. Если ваши потоки заняты и тратят более 25% на использование ЦП - возможно, вы захотите проверить, почему они используют так много ЦП, например, выполняют ли они преобразование или расшифровку данных.


Некоторые основы I Предположим, что эти темы потребляют сообщения. Есть ли сообщения, которые он может потреблять - например, отображать полноту очереди. Если оно меньше 10 - тогда некоторые ваши темы будут ждать. Если curdepth больше 10, тогда все потоки должны иметь возможность запускать и обрабатывать сообщения.

Являются ли сообщения постоянными или непостоянными. Если они не являются постоянными, то запись на диск не ведется, и получение сообщения должно занимать миллисекунды, поэтому вы должны видеть сотни сообщений в секунду, обрабатываемых в очереди. Если они являются постоянными, запись в журнал на диске будет выполняться либо при выходе из syncpoint, либо при фиксации - это может составлять 1-10 (или более) миллисекунд. Так что ваша задача будет тратить большую часть своего времени на ожидание.

Спуск по уровню. В зависимости от вашей версии MQ. Если вы выполняете посылку или получение постоянных сообщений вне точки синхронизации (в распределенном MQ), то блокируется доступ к запросу для сериализации доступа к очереди. Это может ограничить вас до 100 сообщений в секунду. См. Здесь https://developer.ibm.com/messaging/2018/04/24/implicit-syncpointing-persistent-messages-put-outside-syncpoint/

Вам также необходимо подумать о том, являются ли сообщения «получить указанными c» или «получить любые». Например, ваше приложение выполняет MQPUT для сервера, а затем ждет ответа. (Так называемый запрос-ответ или модель клиента). Даже если в очереди есть сообщения, ваша ветка будет ожидать определенного сообщения c. если ваше приложение является сервером, «получите первое сообщение в очереди, обработайте его и отправьте ответ обратно». Тогда Любое доступное сообщение может быть обработано.

0 голосов
/ 04 марта 2020

Это немного больше, чем комментарий и, имхо, немного меньше, чем ответ, но так как здесь слишком много текста для комментария, то здесь.

Вам действительно нужно описать, когда ваши потоки создаются и выполняются, поскольку очень трудно понять, что может пойти не так, но основываясь на вашем утверждении, что потоки в основном находятся в состоянии ожидания, тогда ...

... может случиться так, что если все 10 ваших потоков будут иметь одинаковые synchronised logi c, то в любой момент 9 из них будут заблокированы из это логи c. 10 числа будет ждать, пока ваш чек на (this.status & 0x4) == 0) будет успешным.

ie. если ваш 10-й поток находится в этой логике c и ожидает, все остальные 9 будут ждать, пока 10-й поток перестанет ждать.

Я думаю, вам нужно переработать логи c, чтобы иметь один поток диспетчера, который отслеживает события, а затем отправляет единицу работы ожидающим потокам.

...