Я пытаюсь надежно отправить сообщение от издателя нескольким потребителям, используя обмен RabbitMQ topic
.
Я настроил долговременные очереди (по одной на потребителя) и отправляю постоянные сообщения delivery_mode=2
.Я также устанавливаю канал в режиме confim_delivery
и добавил флаг mandatory=True
для публикации.
В настоящее время служба довольно надежна, но сообщения теряются для одного из потребителей, если она не работает во времяперезапуск посредника с последующей публикацией сообщения.
Кажется, что посредник может восстанавливать очереди и сообщения при перезапуске, но, похоже, он не сохраняет связь между потребителями и очередями.Таким образом, сообщения доходят только до одного из потребителей и теряются из-за того, что он не работает.
Примечание: Сообщения достигают очереди и потребителя, если посредник не переносит перезапуск во времявремя, когда потребитель не работает.Они накапливаются должным образом в очереди и доставляются потребителю, когда он снова включается.
Редактировать - добавляя код потребителя:
import pika
class Consumer(object):
def __init__(self, queue_name):
self.queue_name = queue_name
def consume(self):
credentials = pika.PlainCredentials(
username='myuser', password='mypassword')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='myhost', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='myexchange', exchange_type='topic')
channel.queue_declare(queue=self.queue_name, durable=True)
channel.queue_bind(
exchange='myexchange', queue=self.queue_name, routing_key='my.route')
channel.basic_consume(
consumer_callback=self.message_received, queue=self.queue_name)
channel.start_consuming()
def message_received(self, channel, basic_deliver, properties, body):
print(f'Message received: {body}')
channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
Можно предположить, что каждыйпотребительский сервер делает что-то похожее на:
c = Consumer('myuniquequeue') # each consumer has a permanent queue name
c.consume()
Редактировать - добавить код издателя:
def publish(message):
credentials = pika.PlainCredentials(
username='myuser', password='mypassword')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='myhost', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='myexchange', exchange_type='topic')
channel.confirm_delivery()
success = channel.basic_publish(
exchange='myexchange',
routing_key='my.route',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
),
mandatory=True
)
if success:
print("Message sent")
else:
print("Could not send message")
# Save for sending later
Стоит сказать, что я обрабатываю ошибку на моемсобственный, и это не та часть, которую я хотел бы улучшить.Когда мои сообщения теряются для некоторых потребителей, поток проходит через раздел успеха