Я использую Stomp.py для подключения к стандартному серверу ACtiveMQ. Я моделирую случаи, когда приемник выходит из строя, и я хочу иметь возможность перезапустить его, и он продолжит работу из сообщения после того, которое вызвало его ошибку sh.
Я создал два образца сценария:
- putMessagesToQueue.py - это поместит 56 сообщений в место назначения
- readMessagesFromQueue.py - Это будет читать сообщения из места назначения. Если он прочитает 6-е сообщение, это вызовет исключение. Обработка каждого сообщения занимает 1 секунду
Шаги, которые я предпринимаю для запуска теста:
- Я запускаю putMessagesToQueue.py
- Я запускаю readMessagesFromQueue.py - он успешно обрабатывает 5 сообщений, и в сообщении 6 возникает исключение
- Я завершаю readMessagesFromQueue.py (ctrl- c)
- Я снова запускаю readMessagesFromQueue.py
Для поведения, которое я хочу на шаге 4, я хочу, чтобы он начал обработку с сообщения 7.
Однако я этого не вижу. Если получатель подписывается с помощью ack = 'auto', то на шаге 4 он не обрабатывает сообщения - все сообщения исчезли из очереди, и я потерял 50 сообщений!
Если я использую ack = 'client' или ack = 'client-Individual', то на шаге 4 он снова запускается с самого начала, а затем снова вылетает в сообщении 6.
Кажется, это предполагает, что получатель не обрабатывает сообщения одновременно, вместо этого он принимает каждое отдельное сообщение. сообщение сразу и просматривая каждое. Я не хочу такого поведения, потому что я хотел бы масштабировать до 5 приемников и распределить нагрузку. В настоящий момент первый получатель, который я запускаю, принимает все сообщения и начинает их перебирать, а получатели 2-4 просто ждут новых сообщений. Я хочу, чтобы получатели принимали сообщения по одному!
Может ли кто-нибудь дать какие-нибудь подсказки, как я реализую это неправильно:
Source
putMessagesToQueue.py
import stomp
stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"
conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)
for x in range(0,5):
conn.send(body="OK-BEFORE-CRASH", destination=destination)
conn.send(body="CRASH", destination=destination)
for x in range(0,50):
conn.send(body="OK-AFTER-CRASH", destination=destination)
readMessagesFromQueue.py
import stomp
import time
stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"
conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)
class StompConnectionListenerClass(stomp.ConnectionListener):
processMessage = None
def __init__(self, processMessage):
self.processMessage = processMessage
def on_error(self, headers, message):
print('XX received an error "%s"' % message)
def on_message(self, headers, message):
self.processMessage(headers, message)
def messageProcessingFunction(headers, message):
print('Main recieved a message "%s"' % message)
if (message=="CRASH"):
print("Message told processor to crash")
raise Exception("Reached message which crashes reciever")
time.sleep(1) # simulate processing message taking time
stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction)
conn.set_listener('', stompConnectionListener)
print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='auto')
#conn.subscribe(destination=destination, id=1, ack='client')
#conn.subscribe(destination=destination, id=1, ack='client-individual')
print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
while True:
time.sleep(10)
except KeyboardInterrupt:
print('interrupted - so exiting!')
conn.close()
print("Reciever terminated")
Обновление 001
Мне удалось получить желаемое поведение, описанное выше, изменив функцию приема на использование ack = 'client-Individual' и вручную отправлять сообщения подтверждения. (См. Новую версию ниже)
Но я все еще не могу заставить получателей обрабатывать одно сообщение за раз. Это можно продемонстрировать в следующих шагах:
- Я запускаю putMessagesToQueue.py
- Я запускаю readMessagesFromQueue2.py - он начинает обработку
- При новом запуске терминала readMessagesFromQueue2.py
Сначала второй readMessagesFromQueue2 ничего не делает, пока первый не выйдет из строя, а затем он начинает получать сообщения. Я хочу, чтобы оба экземпляра получателя читали сообщения с самого начала.
readMessagesFromQueue2.py
import stomp
import time
stompurl = "127.0.0.1"
stompport = "61613"
stompuser = "admin"
stomppass = "admin"
destination = "/queue/testQueueWithCrash"
conn = stomp.Connection(host_and_ports=[(stompurl, stompport)])
conn.connect(stompuser,stomppass,wait=True)
class StompConnectionListenerClass(stomp.ConnectionListener):
processMessage = None
conn = None
def __init__(self, processMessage, conn):
self.processMessage = processMessage
self.conn = conn
def on_error(self, headers, message):
print('XX received an error "%s"' % message)
def on_message(self, headers, message):
try:
self.processMessage(headers, message)
finally:
self.conn.ack(id=headers["message-id"], subscription=headers["subscription"])
def messageProcessingFunction(headers, message):
print('Main recieved a message "%s"' % message)
if (message=="CRASH"):
print("Message told processor to crash")
raise Exception("Reached message which crashes reciever")
time.sleep(1) # simulate processing message taking time
stompConnectionListener = StompConnectionListenerClass(processMessage=messageProcessingFunction, conn=conn)
conn.set_listener('', stompConnectionListener)
print("Subscribing")
conn.subscribe(destination=destination, id=1, ack='client-individual')
print("Terminate loop starting (Press ctrl+c when you want to exit)")
try:
while True:
time.sleep(10)
except KeyboardInterrupt:
print('interrupted - so exiting!')
conn.close()
print("Reciever terminated")