У меня есть очередь сообщений с использованием ActiveMQ. Веб-запрос помещает сообщения в очередь с постоянством = True. Теперь у меня есть 2 потребителя, которые оба подключены как отдельные сеансы к этой очереди. Потребитель 1 всегда подтверждает сообщение, а потребитель 2 никогда не делает.
Теперь я прочитал это http://activemq.apache.org/how-does-a-queue-compare-to-a-topic.html, в котором говорится:
Очередь JMS реализует загрузку семантика балансировки. Одно сообщение будет получено ровно одним потребителем. Если на момент отправки сообщения нет доступных потребителей, оно будет храниться до тех пор, пока не будет доступен потребитель, который сможет обработать сообщение. Если потребитель получит сообщение и не подтвердит его до закрытия, сообщение будет доставлено другому потребителю. В очереди может быть много потребителей с балансировкой нагрузки сообщений между доступными потребителями.
Из этого я понимаю, что я ожидаю, что все сообщения в конечном итоге будут обработаны потребителем 1, поскольку он всегда подтверждает это. Поскольку потребитель 2 не подтверждает, сообщение должно быть отправлено потребителю 1.
Но я замечаю следующее: 1. Когда я отправляю запрос, я вижу только каждый второй запрос, поступающий к потребителю 1 Другой запрос не отображается и сохраняется в ActiveMQ. Я предполагаю, что это пошло к потребителю 2, который не признал. Так должно ли это дойти до потребителя 1?
Мне просто нужно убедиться, что сообщение обрабатывается только одним потребителем. В моем случае этот потребитель - машина в стране (сайте) X. Каждое сообщение должно обрабатываться только в одной стране (машине). Но все страны (машины) должны получить сообщение. Если идентификатор страны совпадает в сообщении, оно будет подтверждено. Таким образом, будет отправлено только 1 подтверждение / сообщение.
Мой код для получения / обработки сообщений выглядит следующим образом:
# --------------------------------------------- MODULE IMPORT ---------------------------------------------------------#
import argparse
import json
import logging
import multiprocessing as mp
import sys
import stomp
from tvpv_portal.services.msgbkr import MsgBkr
from utils import util
# --------------------------------------------- DEVELOPMENT CODE ------------------------------------------------------#
log = logging.getLogger(__name__)
class MessageProcessingListener(stomp.ConnectionListener):
"""This class is responsible for processing (consuming) the messages from ActiveMQ."""
def __init__(self, conn, cb):
"""Initialization.
Args:
conn -- Connection object
cb -- Callback function
"""
self._conn = conn
self._cb = cb
def on_error(self, headers, body):
"""When we get an error.
Args:
headers -- Message header
body -- Message body
"""
log.error('Received error=%s', body)
def on_message(self, headers, body):
"""When we receive a message.
Args:
headers -- Message header
body -- Message body
"""
log.info('Received message')
# Deserialize the message.
item = json.loads(body)
import pprint
pprint.pprint(item)
# TODO: check if msg is to be handled by this SITE. If so, acknowledge and queue it. Otherwise, ignore.
# Put message into queue via callback (queue.put) function.
#self._cb(item)
# TODO: we only send acknowledge if we are supposed to process this message.
# Send acknowledgement to ActiveMQ indicating message is consumed.
self._conn.ack(headers['message-id'], headers['subscription'])
def worker(q):
"""Worker to retrieve item from queue and process it.
Args:
q -- Queue
"""
# Run in an infinite loop. Get an item from the queue to process it. We MUST call q.task_done() to indicate
# that item is processed to prevent deadlock.
while True:
try:
item = q.get()
# TODO: We will call external script from here to run on Netbatch in the future.
log.info('Processed message')
finally:
q.task_done()
def flash_mq_rst_handler_main():
"""Main entry to the request handler."""
# Define arguments.
parser = argparse.ArgumentParser(description='Flash message queue request handler script',
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
add_help=False)
opts = parser.add_argument_group('Options')
opts.add_argument('-h', '--help', action='help',
help='Show this help message and exit')
opts.add_argument('--workers', metavar='val', type=int, default=4,
help='Number of worker processes')
opts.add_argument('--log', metavar='file', type=util.get_resolved_abspath, default='flash_mq_rst_handler.log',
help='Log file')
# Parse arguments.
args = parser.parse_args()
# Setup logger.
util.configure_logger(args.log)
log.info('Command line %s', ' '.join(map(str, sys.argv)))
# Create a managed queue to store messages retrieved from message queue.
queue = mp.Manager().JoinableQueue()
# Instantiate consumer message broker + ensure connection.
consumer = MsgBkr(producer=False)
if not consumer.is_connected():
log.critical('Unable to connect to message queue; please debug')
sys.exit(1)
# Register listener with consumer + queue.put as the callback function to trigger when a message is received.
consumer.set_listener('message_processing_listener', MessageProcessingListener, cb=queue.put)
# Run in an infinite loop to wait form messages.
try:
log.info('Create pool with worker=%d to process messages', args.workers)
with mp.Pool(processes=args.workers) as pool:
p = pool.apply_async(worker, (queue,))
p.get()
except KeyboardInterrupt:
pass
# See MsgBkr. It will close the connection during exit() so we don't have to do it.
sys.exit(0)
if __name__ == '__main__':
flash_mq_rst_handler_main()