Долгосрочные привязки очереди RabbitMQ - PullRequest
0 голосов
/ 12 февраля 2019

Я пытаюсь надежно отправить сообщение от издателя нескольким потребителям, используя обмен RabbitMQ topic.

Я настроил долговременные очереди (по одной на потребителя) и отправляю постоянные сообщения delivery_mode=2.Я также устанавливаю канал в режиме confim_delivery и добавил флаг mandatory=True для публикации.

В настоящее время служба довольно надежна, но сообщения теряются для одного из потребителей, если она не работает во времяперезапуск посредника с последующей публикацией сообщения.

Кажется, что посредник может восстанавливать очереди и сообщения при перезапуске, но, похоже, он не сохраняет связь между потребителями и очередями.Таким образом, сообщения доходят только до одного из потребителей и теряются из-за того, что он не работает.

Примечание: Сообщения достигают очереди и потребителя, если посредник не переносит перезапуск во времявремя, когда потребитель не работает.Они накапливаются должным образом в очереди и доставляются потребителю, когда он снова включается.

Редактировать - добавляя код потребителя:

import pika


class Consumer(object):
    def __init__(self, queue_name):
        self.queue_name = queue_name

    def consume(self):
        credentials = pika.PlainCredentials(
             username='myuser', password='mypassword')
        connection = pika.BlockingConnection(
             pika.ConnectionParameters(host='myhost', credentials=credentials))
        channel = connection.channel()
        channel.exchange_declare(exchange='myexchange', exchange_type='topic')
        channel.queue_declare(queue=self.queue_name, durable=True)
        channel.queue_bind(
            exchange='myexchange', queue=self.queue_name, routing_key='my.route')
        channel.basic_consume(
            consumer_callback=self.message_received, queue=self.queue_name)
        channel.start_consuming()

    def message_received(self, channel, basic_deliver, properties, body):
        print(f'Message received: {body}')
        channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)

Можно предположить, что каждыйпотребительский сервер делает что-то похожее на:

c = Consumer('myuniquequeue')  # each consumer has a permanent queue name
c.consume()

Редактировать - добавить код издателя:

def publish(message):
    credentials = pika.PlainCredentials(
        username='myuser', password='mypassword')
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='myhost', credentials=credentials))
    channel = connection.channel()
    channel.exchange_declare(exchange='myexchange', exchange_type='topic')
    channel.confirm_delivery()
    success = channel.basic_publish(
        exchange='myexchange',
        routing_key='my.route',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ),
        mandatory=True
    )
    if success:
        print("Message sent")
    else:
        print("Could not send message")
        # Save for sending later

Стоит сказать, что я обрабатываю ошибку на моемсобственный, и это не та часть, которую я хотел бы улучшить.Когда мои сообщения теряются для некоторых потребителей, поток проходит через раздел успеха

1 Ответ

0 голосов
/ 14 февраля 2019

Используйте basic.ack(delivery_tag=basic_deliver.delivery_tag) в методе обратного вызова потребителя.Это подтверждение указывает, получил ли потребитель сообщение и обработал его или нет.Если это отрицательное подтверждение, сообщение будет помещено в очередь.

Edit # 1 Чтобы получать сообщения во время сбоя брокера, брокер должен быть распределен.Это концепция, называемая зеркальными очередями в RabbitMQ.Mirrored Queues позволяет реплицировать ваши очереди на узлы в вашем кластере.Если один из узлов, содержащих очередь, выйдет из строя, другой узел, содержащий очередь, будет выступать в качестве вашего брокера.

Для полного понимания обратитесь к этому Зеркальные очереди

...