Согласно документации RabbitMQ на Благодарности потребителя :
Когда сообщение ставится в очередь, оно по возможности помещается на свою исходную позицию в своей очереди. Если нет (из-за одновременных доставок и подтверждений от других потребителей, когда несколько потребителей совместно используют очередь), сообщение будет помещено в очередь ближе к заголовку очереди.
То есть с одним клиентским потребителем, если очередь сервера изначально
хвост [c b a] голова
и клиентский клиент потребляет сообщение заголовка ("a"), очередь сервера должна стать:
хвост [c b] голова
Затем, если клиентский клиент фиксирует обработанное сообщение, сообщение должно быть поставлено в очередь в очереди сервера в заголовке (его «исходная позиция» согласно документации), и очередь сервера должна стать:
хвост [c b a] голова
Наконец, клиентский клиент должен снова потреблять то же самое сообщение заголовка ("a").
Но это не то, что я наблюдал, используя библиотеку Python Pika. Что я заметил, так это то, что сообщения с задержкой помещаются в очередь в хвосте очереди сервера, а не в голове («исходная позиция»). Документация RabbitMQ верна или библиотека Пика верна?
Пример кода:
import logging
import pika
logging.basicConfig(level=logging.INFO)
logging.getLogger("pika").propagate = False
parameters = pika.ConnectionParameters()
# Produce messages
with pika.BlockingConnection(parameters) as connection:
queue = "foobar"
routing_key = queue
channel = connection.channel()
channel.queue_declare(queue=queue)
for body in ["a", "b", "c"]:
channel.publish(exchange="", routing_key=routing_key, body=body)
logging.info("Produced message %r with routing key %r", body, routing_key)
# Consume messages
def handle(channel, method, properties, body):
logging.info("Consumed message %r from queue %r", body.decode(), queue)
channel.basic_nack(method.delivery_tag)
with pika.BlockingConnection(parameters) as connection:
queue = "foobar"
channel = connection.channel()
channel.queue_declare(queue=queue)
channel.basic_consume(queue=queue, on_message_callback=handle)
channel.start_consuming()
Выход:
INFO: root: созданное сообщение 'a' с ключом маршрутизации 'foobar'
ИНФОРМАЦИЯ: root: созданное сообщение 'b' с ключом маршрутизации 'foobar'
ИНФОРМАЦИЯ: root: созданное сообщение 'c' с ключом маршрутизации 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'b' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'c' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'b' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'c' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'b' из очереди 'foobar'
ИНФОРМАЦИЯ: root: использованное сообщение 'c' из очереди 'foobar'