Я тестирую производительность IBM MQ (запускаю последнюю версию в локальном контейнере docker). Я использую постоянную очередь.
На стороне производителя я могу повысить производительность, запустив несколько параллельное создание приложений.
Однако на стороне потребителя я не могу увеличить пропускную способность, распараллеливая процессы пользователя. Напротив, пропускная способность для нескольких потребителей даже хуже, чем для одного.
Что может быть причиной низкой производительности потребления?
Это не должно быть связано с аппаратным ограничением, так как я сравниваю потребление с производством, и я делал только потребление сообщений без какой-либо другой обработки.
Выполняет ли GET принятие для каждое сообщение? Я не нахожу в PyMQI никакого явного метода фиксации.
put_demo.py
#!/usr/bin/env python3
import pymqi
import time
queue_manager = 'QM1'
channel = 'DEV.APP.SVRCONN'
host = '127.0.0.1'
port = '1414'
queue_name = 'DEV.QUEUE.1'
message = b'Hello from Python!'
conn_info = '%s(%s)' % (host, port)
nb_messages = 1000
t0 = time.time()
qmgr = pymqi.connect(queue_manager, channel, conn_info)
queue = pymqi.Queue(qmgr, queue_name)
for i in range(nb_messages):
try:
queue.put(message)
except pymqi.MQMIError as e:
print(f"Fatal error: {str(e)}")
queue.close()
qmgr.disconnect()
t1 = time.time()
print(f"tps: {nb_messages/(t1-t0):.0f} nb_message_produced: {nb_messages}")
get_demo.py
#!/usr/bin/env python3
import pymqi
import time
import os
queue_manager = 'QM1'
channel = 'DEV.APP.SVRCONN'
host = '127.0.0.1'
port = '1414'
queue_name = 'DEV.QUEUE.1'
conn_info = '%s(%s)' % (host, port)
nb_messages = 1000
nb_messages_consumed = 0
t0 = time.time()
qmgr = pymqi.connect(queue_manager, channel, conn_info)
queue = pymqi.Queue(qmgr, queue_name)
gmo = pymqi.GMO(Options = pymqi.CMQC.MQGMO_WAIT | pymqi.CMQC.MQGMO_FAIL_IF_QUIESCING)
gmo.WaitInterval = 1000
while nb_messages_consumed < nb_messages:
try:
msg = queue.get(None, None, gmo)
nb_messages_consumed += 1
except pymqi.MQMIError as e:
if e.reason == 2033:
# No messages, that's OK, we can ignore it.
pass
queue.close()
qmgr.disconnect()
t1 = time.time()
print(f"tps: {nb_messages_consumed/(t1-t0):.0f} nb_messages_consumed: {nb_messages_consumed}")
run results
> for i in {1..10}; do ./put_demo.py & done
tps: 385 nb_message_produced: 1000
tps: 385 nb_message_produced: 1000
tps: 383 nb_message_produced: 1000
tps: 379 nb_message_produced: 1000
tps: 378 nb_message_produced: 1000
tps: 377 nb_message_produced: 1000
tps: 377 nb_message_produced: 1000
tps: 378 nb_message_produced: 1000
tps: 374 nb_message_produced: 1000
tps: 374 nb_message_produced: 1000
> for i in {1..10}; do ./get_demo.py & done
tps: 341 nb_messages_consumed: 1000
tps: 339 nb_messages_consumed: 1000
tps: 95 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
tps: 82 nb_messages_consumed: 1000
get_demo.py updated version using syncpoint and batch commit
#!/usr/bin/env python3
import pymqi
import time
import os
queue_manager = 'QM1'
channel = 'DEV.APP.SVRCONN'
host = '127.0.0.1'
port = '1414'
queue_name = 'DEV.QUEUE.1'
conn_info = '%s(%s)' % (host, port)
nb_messages = 1000
commit_batch = 10
nb_messages_consumed = 0
t0 = time.time()
qmgr = pymqi.connect(queue_manager, channel, conn_info)
queue = pymqi.Queue(qmgr, queue_name)
gmo = pymqi.GMO(Options = pymqi.CMQC.MQGMO_WAIT | pymqi.CMQC.MQGMO_FAIL_IF_QUIESCING | pymqi.CMQC.MQGMO_SYNCPOINT)
gmo.WaitInterval = 1000
while nb_messages_consumed < nb_messages:
try:
msg = queue.get(None, None, gmo)
nb_messages_consumed += 1
if nb_messages_consumed % commit_batch == 0:
qmgr.commit()
except pymqi.MQMIError as e:
if e.reason == 2033:
# No messages, that's OK, we can ignore it.
pass
queue.close()
qmgr.disconnect()
t1 = time.time()
print(f"tps: {nb_messages_consumed/(t1-t0):.0f} nb_messages_consumed: {nb_messages_consumed}")
Спасибо.