Как я могу восстановить неподтвержденные сообщения AMQP из других каналов, кроме моего собственного соединения? - PullRequest
43 голосов
/ 15 августа 2011

Кажется, чем дольше я работаю на своем сервере rabbitmq, тем больше у меня проблем с неподтвержденными сообщениями.Я хотел бы потребовать их.На самом деле, кажется, для этого есть команда amqp, но она применяется только к каналу, который использует ваше соединение.Я создал небольшой скрипт pika, чтобы хотя бы попробовать его, но я либо что-то упустил, либо это невозможно сделать (как насчет rabbitmqctl?)

import pika

credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,\
    credentials=credentials, virtual_host='***')

def handle_delivery(body):
    """Called when we receive a message from RabbitMQ"""
    print body

def on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    connection.channel(on_channel_open)    

def on_channel_open(new_channel):
    """Called when our channel has opened"""
    global channel
    channel = new_channel
    channel.basic_recover(callback=handle_delivery,requeue=True)    

try:
    connection = pika.SelectConnection(parameters=parameters,\
        on_open_callback=on_connected)    

    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    connection.ioloop.start()

Ответы [ 3 ]

62 голосов
/ 15 августа 2011

Неподтвержденные сообщения - это сообщения, которые были доставлены по сети потребителю, но еще не были подтверждены или отклонены, но этот потребитель еще не закрыл канал или соединение, по которому он их первоначально получил. Таким образом, брокер не может понять, сколько времени потребителю потребовалось для обработки этих сообщений или он забыл о них. Таким образом, он оставляет их в неподтвержденном состоянии до тех пор, пока либо потребитель не умрет, либо они не будут подтверждены или отклонены.

Поскольку эти сообщения все еще могут быть корректно обработаны в будущем еще живым потребителем, который первоначально потреблял их, вы не можете (насколько мне известно) вставить другого потребителя в смесь и попытаться принять внешние решения о них. Вам нужно заставить своих потребителей принимать решения по каждому сообщению по мере его обработки, а не оставлять старые сообщения неподтвержденными.

19 голосов
/ 25 сентября 2013

Если сообщения не распакованы, есть только два способа вернуть их в очередь:

  1. basic.nack

    Эта команда будетпривести к тому, что сообщение будет помещено обратно в очередь и доставлено.

  2. Отключение от посредника

    Это действие приведет к тому, что все непрочитанные сообщения из этогоканал, который нужно вернуть в очередь.

ПРИМЕЧАНИЕ : basic.recover попытается переиздать неупакованные сообщения на том же канале (тому же потребителю), чтоиногда желаемое поведение.

Спецификация RabbitMQ для basic.recover и basic.nack


Реальный вопрос: почему сообщения не подтверждены?

Возможные сценарии, вызывающие неупакованные сообщения:

  1. Потребитель получает слишком много сообщений, затем не обрабатывает и не проверяет их достаточно быстро.

    Решение: Предварительно выберите как можно меньше сообщений.

  2. Ошибка клиентской библиотеки (у меня есть эта проблема в настоящее время с pika 0.9.13 .Если в очереди много сообщений, определенное количество сообщений застрянет без распаковки, даже спустя несколько часов.

    Решение: мне придется перезапускать потребителя несколько раз, пока все не распакованные сообщения не будут удалены из очереди.

4 голосов
/ 25 ноября 2016

Все неподтвержденные сообщения перейдут в состояние готовности после остановки всех рабочих / потребителей.

Убедитесь, что все работники остановлены, подтвердив вывод grep на ps aux и остановив / убив их, если они найдены.

Если вы управляете работниками с помощью супервизора, который отображается как работникостановлен, вы можете проверить наличие зомби.Supervisor сообщает об остановке работника, но вы все равно обнаружите, что зомби-процессы работают при отключении вывода ps aux.Уничтожение процессов зомби вернет сообщения в состояние готовности.

...