RabbitMQ предварительный заказ - PullRequest
0 голосов
/ 15 января 2019

Согласно документации RabbitMQ на Благодарности потребителя :

Когда сообщение ставится в очередь, оно по возможности помещается на свою исходную позицию в своей очереди. Если нет (из-за одновременных доставок и подтверждений от других потребителей, когда несколько потребителей совместно используют очередь), сообщение будет помещено в очередь ближе к заголовку очереди.

То есть с одним клиентским потребителем, если очередь сервера изначально

хвост [c b a] голова

и клиентский клиент потребляет сообщение заголовка ("a"), очередь сервера должна стать:

хвост [c b] голова

Затем, если клиентский клиент фиксирует обработанное сообщение, сообщение должно быть поставлено в очередь в очереди сервера в заголовке (его «исходная позиция» согласно документации), и очередь сервера должна стать:

хвост [c b a] голова

Наконец, клиентский клиент должен снова потреблять то же самое сообщение заголовка ("a").

Но это не то, что я наблюдал, используя библиотеку Python Pika. Что я заметил, так это то, что сообщения с задержкой помещаются в очередь в хвосте очереди сервера, а не в голове («исходная позиция»). Документация RabbitMQ верна или библиотека Пика верна?

Пример кода:

import logging

import pika

logging.basicConfig(level=logging.INFO)
logging.getLogger("pika").propagate = False
parameters = pika.ConnectionParameters()


# Produce messages

with pika.BlockingConnection(parameters) as connection:
    queue = "foobar"
    routing_key = queue
    channel = connection.channel()
    channel.queue_declare(queue=queue)

    for body in ["a", "b", "c"]:
        channel.publish(exchange="", routing_key=routing_key, body=body)
        logging.info("Produced message %r with routing key %r", body, routing_key)


# Consume messages

def handle(channel, method, properties, body):
    logging.info("Consumed message %r from queue %r", body.decode(), queue)
    channel.basic_nack(method.delivery_tag)


with pika.BlockingConnection(parameters) as connection:
    queue = "foobar"
    channel = connection.channel()
    channel.queue_declare(queue=queue)
    channel.basic_consume(queue=queue, on_message_callback=handle)
    channel.start_consuming()

Выход:

INFO: root: созданное сообщение 'a' с ключом маршрутизации 'foobar'
ИНФОРМАЦИЯ: root: созданное сообщение 'b' с ключом маршрутизации 'foobar'
ИНФОРМАЦИЯ: root: созданное сообщение 'c' с ключом маршрутизации 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'b' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'c' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'b' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'c' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'b' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'c' из очереди 'foobar'

Ответы [ 2 ]

0 голосов
/ 15 января 2019

Спасибо, Оливье. С channel.basic_qos(prefetch_count=1) я получаю документированное поведение:

INFO: root: созданное сообщение 'a' с ключом маршрутизации 'foobar'
ИНФОРМАЦИЯ: root: созданное сообщение 'b' с ключом маршрутизации 'foobar'
ИНФОРМАЦИЯ: root: созданное сообщение 'c' с ключом маршрутизации 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'a' из очереди 'foobar'

0 голосов
/ 15 января 2019

Поведение, с которым вы столкнулись, наиболее вероятно из-за поведения предварительной выборки.

Поскольку вы не указали желаемое качество обслуживания, я считаю (был бы признателен более осведомленный источник, подтверждающий эту точку?), Что предварительная выборка определяется сервером и, скорее всего, будет довольно высокой.

Идея состоит в том, что для обеспечения производительности клиент может получить несколько сообщений, что было бы выгодно в большинстве случаев:

  • если есть многопоточность на стороне потребителя, он, вероятно, может параллельно обрабатывать несколько сообщений, поэтому будет иметь несколько сообщений, еще не подтвержденных в любой момент времени
  • чтобы обеспечить более плавную обработку в «счастливых» случаях, клиент может подтвердить блок сообщений, сообщая серверу, что до данного сообщения все сообщения, полученные потребителем, подтверждены, это уменьшает накладные расходы, когда мы имеем случаи большого количества сообщений, которые требуют небольшой обработки

Если вы посмотрите ссылки на документацию ниже, они объяснят, как вы можете контролировать поведение.

Дополнительная информация по этим пунктам доступна по адресу:

...