Stomp.py не обрабатывает одно сообщение за раз - PullRequest
0 голосов
/ 05 мая 2020

Я использую Stomp.py для подключения к стандартному серверу ACtiveMQ. Я моделирую случаи, когда приемник выходит из строя, и я хочу иметь возможность перезапустить его, и он продолжит работу из сообщения после того, которое вызвало его ошибку sh.

Я создал два образца сценария:

  • putMessagesToQueue.py - это поместит 56 сообщений в место назначения
  • readMessagesFromQueue.py - Это будет читать сообщения из места назначения. Если он прочитает 6-е сообщение, это вызовет исключение. Обработка каждого сообщения занимает 1 секунду

Шаги, которые я предпринимаю для запуска теста:

  1. Я запускаю putMessagesToQueue.py
  2. Я запускаю readMessagesFromQueue.py - он успешно обрабатывает 5 сообщений, и в сообщении 6 возникает исключение
  3. Я завершаю readMessagesFromQueue.py (ctrl- c)
  4. Я снова запускаю readMessagesFromQueue.py

Для поведения, которое я хочу на шаге 4, я хочу, чтобы он начал обработку с сообщения 7.

Однако я этого не вижу. Если получатель подписывается с помощью ack = 'auto', то на шаге 4 он не обрабатывает сообщения - все сообщения исчезли из очереди, и я потерял 50 сообщений!

Если я использую ack = 'client' или ack = 'client-Individual', то на шаге 4 он снова запускается с самого начала, а затем снова вылетает в сообщении 6.

Кажется, это предполагает, что получатель не обрабатывает сообщения одновременно, вместо этого он принимает каждое отдельное сообщение. сообщение сразу и просматривая каждое. Я не хочу такого поведения, потому что я хотел бы масштабировать до 5 приемников и распределить нагрузку. В настоящий момент первый получатель, который я запускаю, принимает все сообщения и начинает их перебирать, а получатели 2-4 просто ждут новых сообщений. Я хочу, чтобы получатели принимали сообщения по одному!

Может ли кто-нибудь дать какие-нибудь подсказки, как я реализую это неправильно:

Source

putMessagesToQueue.py

import stomp

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

for x in range(0,5):
  conn.send(body="OK-BEFORE-CRASH", destination=destination)

conn.send(body="CRASH", destination=destination)

for x in range(0,50):
  conn.send(body="OK-AFTER-CRASH", destination=destination)

readMessagesFromQueue.py

import stomp
import time

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

class StompConnectionListenerClass(stomp.ConnectionListener):
  processMessage = None
  def __init__(self, processMessage):
    self.processMessage = processMessage
  def on_error(self, headers, message):
    print('XX received an error "%s"' % message)
  def on_message(self, headers, message):
    self.processMessage(headers, message)

def messageProcessingFunction(headers, message):
  print('Main recieved a message "%s"' % message)
  if (message=="CRASH"):
    print("Message told processor to crash")
    raise Exception("Reached message which crashes reciever")
  time.sleep(1) # simulate processing message taking time

stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction)
conn.set_listener('', stompConnectionListener)

print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='auto')
#conn.subscribe(destination=destination, id=1, ack='client')
#conn.subscribe(destination=destination, id=1, ack='client-individual')

print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
    while True:
        time.sleep(10)
except KeyboardInterrupt:
    print('interrupted - so exiting!')

conn.close()

print("Reciever terminated")

Обновление 001

Мне удалось получить желаемое поведение, описанное выше, изменив функцию приема на использование ack = 'client-Individual' и вручную отправлять сообщения подтверждения. (См. Новую версию ниже)

Но я все еще не могу заставить получателей обрабатывать одно сообщение за раз. Это можно продемонстрировать в следующих шагах:

  1. Я запускаю putMessagesToQueue.py
  2. Я запускаю readMessagesFromQueue2.py - он начинает обработку
  3. При новом запуске терминала readMessagesFromQueue2.py

Сначала второй readMessagesFromQueue2 ничего не делает, пока первый не выйдет из строя, а затем он начинает получать сообщения. Я хочу, чтобы оба экземпляра получателя читали сообщения с самого начала.

readMessagesFromQueue2.py

import stomp
import time

stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"

conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)

class StompConnectionListenerClass(stomp.ConnectionListener):
  processMessage = None
  conn = None
  def __init__(self, processMessage, conn):
    self.processMessage = processMessage
    self.conn = conn
  def on_error(self, headers, message):
    print('XX received an error "%s"' % message)
  def on_message(self, headers, message):
    try:
      self.processMessage(headers, message)
    finally:
      self.conn.ack(id=headers["message-id"], subscription=headers["subscription"])

def messageProcessingFunction(headers, message):
  print('Main recieved a message "%s"' % message)
  if (message=="CRASH"):
    print("Message told processor to crash")
    raise Exception("Reached message which crashes reciever")
  time.sleep(1) # simulate processing message taking time

stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction, conn=conn)
conn.set_listener('', stompConnectionListener)

print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='client-individual')

print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
    while True:
        time.sleep(10)
except KeyboardInterrupt:
    print('interrupted - so exiting!')

conn.close()

print("Reciever terminated")

1 Ответ

1 голос
/ 05 мая 2020

Я много читал разные документы, и я обнаружил проблему.

ActiveMQ имеет размер предварительной выборки - https://svn.apache.org/repos/infra/websites/production/activemq/content/5.7.0/what-is-the-prefetch-limit-for.html

Если у вас мало сообщений, которые принимают длительное время обработки, вы можете установить его на 1. Это не подходит в других ситуациях.

Я могу сделать это в stopm.py с помощью следующей строки: conn.subscribe (destination = destination, id = 1 , ack = 'client-Individual', headers = {'activemq.prefetchSize': 1})

Таким образом, использование ручного или автоматического подтверждения не было ни здесь, ни там. Ключ ограничивает предварительную выборку до 1.

...