Почему сообщения в очереди RabbitMQ теряются при перезапуске потребителя? - PullRequest
2 голосов
/ 03 мая 2019

Я настроил получателя RabbitMQ следующим образом:

from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor

import pika
import datetime
import logging
import json
from logging import StreamHandler
from time import sleep
from random import randint
from pika import SelectConnection

logging.basicConfig(handlers=[StreamHandler()],
                    level=logging.INFO,
                    format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)


class QueueConsumer(object):
    """The consumer class to manage connections to the AMQP server/queue"""

    def __init__(self, queue, logger, parameters, thread_id=0):
        self.channel = None
        self.connection = None
        self.queue_name = queue
        self.logger = logger
        self.consumer_id = 'Thread: %d' % (thread_id,)
        self.parameters = pika.ConnectionParameters(**parameters)

    def _on_queue_declared(self, frame):
        self.logger.debug('{} ... declaring queue'.format(self.consumer_id))
        self.channel.basic_qos(prefetch_count=1)
        try:
            self.channel.basic_consume(self.handle_delivery, queue=self.queue_name, no_ack=True)
            self.logger.info("{} Declared queue...".format(self.consumer_id))
        except Exception as e:
            self.logger.error('{} crashing:--> {}'.format(self.consumer_id, str(e)))

    def _on_channel_open(self, channel):
        self.channel = channel
        try:
            self.channel.queue_declare(queue=self.queue_name,
                                       exclusive=False,
                                       durable=True,
                                       auto_delete=False,
                                       callback=self._on_queue_declared)
            self.logger.info("{} Opened Channel....".format(self.consumer_id))
        except Exception as e:
            self.logger.error('{} {}'.format(self.consumer_id, str(e)))

    def _on_connected(self, connection):
        connection.channel(self._on_channel_open)

    def consume(self):
        try:
            self.connection = SelectConnection(self.parameters,
                                               self._on_connected)
            self.connection.ioloop.start()
        except Exception as e:
            self.logger.error('{} {}'.format(self.consumer_id, str(e)))
            self.connection.close()
            self.connection.ioloop.start()

    def decode(self, body):
        try:
            _body = body.decode('utf-8')
        except AttributeError:
            _body = body

        return _body

    def handle_delivery(self, channel, method, header, body):
        try:
            start_time = datetime.datetime.now()
            _logger.info("Received...")
            _logger.info("Content: %s" % body)
            req = json.loads(self.decode(body))

            # Do something
            sleep(randint(10, 100))

            time_taken = datetime.datetime.now() - start_time
            _logger.info("[{}] Time Taken: {}.{}".format(
                req.get("to_num"), time_taken.seconds, time_taken.microseconds))

        except Exception as err:
            _logger.exception(err)


if __name__ == "__main__":
    workers = 3
    pika_parameters = OrderedDict([('host', '127.0.0.1'), ('port', 5672), ('virtual_host', '/')])
    try:
        pool = ThreadPoolExecutor(max_workers=workers)
        start = 1
        for thread_id in range(start, (workers + start)):
            pool.submit(QueueConsumer('test_queue', _logger, pika_parameters, thread_id).consume)

    except Exception as err:
        _logger.exception(err)

У меня также есть издатель очереди, как показано ниже:

import uuid
import pika
import logging
import json
from logging import StreamHandler
from pika import SelectConnection

logging.basicConfig(handlers=[StreamHandler()],
                    level=logging.DEBUG,
                    format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)


class QueuePublisherClient(object):

    def __init__(self, queue, request):
        self.queue = queue
        self.response = None
        self.channel = None
        self.request = request
        self.corrId = str(uuid.uuid4())
        self.callBackQueue = None
        self.connection = None
        parameters = pika.ConnectionParameters(host="0.0.0.0")
        self.connection = SelectConnection(
            parameters, self.on_response_connected
        )
        self.connection.ioloop.start()

    def on_response(self, ch, method, props, body):
        if self.corrId == props.correlation_id:
            self.response = body
            self.connection.close()
            self.connection.ioloop.start()

    def on_response_connected(self, connection):
        _logger.info("Connected...\t(%s)" % self.queue)
        self.connection = connection
        self.connection.channel(self.on_channel_open)

    def on_connected(self, connection):
        self.connection = connection
        self.connection.channel(self.on_channel_open)

    def on_channel_open(self, channel):
        # _logger.info("Channel Opened...\t(%s)" % self.queue)
        self.channel = channel
        self.channel.queue_declare(queue=self.queue,
                                   durable=True,
                                   exclusive=False,
                                   auto_delete=False,
                                   callback=self.on_queue_declared)

    def on_queue_declared(self, frame):
        self.channel.basic_publish(exchange="",
                                   routing_key=self.queue,
                                   properties=pika.BasicProperties(),
                                   body=str(self.request))
        self.connection.close()
        _logger.info("Message Published...\t(%s)" % self.queue)


if __name__ == "__main__":
    data = {
        'text': 'This is a sample text',
        'to_num': '+2547xxxxxxxx'
    }
    count = 10000

    for index in range(count):
        data['index'] = index
        QueuePublisherClient("test_queue", json.dumps(data))

Когда я публикую 10000 сообщений в очередии потребитель не запускается, через rabbitmqctl list_queues Я вижу, что test_queue имеет 10000 сообщений.Когда я запускаю потребителя, я запускаю rabbitmqctl list_queues и вижу, что в очереди 0 сообщений.Однако потребитель все еще потребляет сообщения из очереди.Проблема в том, что когда я останавливаю потребителя через несколько секунд, а затем перезапускаю его, я не могу восстановить свои сообщения.Как я могу избежать этого?

Это всего лишь симуляция реальной ситуации, когда потребительский процесс перезапускается с помощью monit, и я страдаю от потери сообщений.

Ответы [ 2 ]

2 голосов
/ 03 мая 2019

Для начала вы должны использовать последнюю версию Pika.

Когда вы устанавливаете no_ack=True (auto_ack=True в Pika 1.0), RabbitMQ считает сообщение подтвержденным, когда оно доставлено.Это означает, что каждое сообщение, которое ваш потребитель имеет в памяти (или в стеке TCP) при остановке, будет потеряно, потому что RabbitMQ считает его подтвержденным.

Вы должны использовать no_ack=False (по умолчанию) и подтверждать сообщенияв handle_delivery как только ваша работа будет завершена.Обратите внимание, что если ваша работа занимает много времени, вы должны сделать это в другом потоке, чтобы предотвратить блокировку цикла ввода / вывода Пики.

См. Следующую документацию: https://www.rabbitmq.com/confirms.html


ПРИМЕЧАНИЕ: команда RabbitMQ отслеживает список рассылки rabbitmq-users и только иногда отвечает на вопросы по StackOverflow.

0 голосов
/ 03 мая 2019

Поскольку вы уже объявили prefetch_count равным 1, а ваша очередь надежна, то при запуске потребителя она будет обрабатывать сообщения только одно за другим. Чтобы проверить то же самое, вы можете поставить сон на 1 секунду в своем коде и попытаться перезапустить потребителя через несколько секунд. Вы увидите, что обработанные сообщения удаляются только из очереди. если вы не установили счетчик предварительной выборки, то один потребитель запустил только все сообщения, вымытые из вашей очереди. Надеюсь, это поможет.

...