RabbitMQ: постоянное сообщение с темой обмена - PullRequest
60 голосов
/ 27 мая 2011

Я очень новичок в RabbitMQ.

Я настроил обмен темами. Потребители могут быть запущены после издателя. Я бы хотел, чтобы потребители могли получать сообщения, которые были отправлены до того, как они были запущены, и которые еще не были использованы.

Обмен настроен со следующими параметрами:

exchange_type => 'topic'
durable => 1
auto_delete => 0
passive => 0

Сообщения публикуются с этим параметром:

delivery_mode => 2

Потребители используют get () для получения сообщений от обмена.

К сожалению, любое сообщение, опубликованное до того, как был запущен какой-либо клиент, потеряно. Я использовал разные комбинации.

Полагаю, моя проблема в том, что обмен не хранит сообщения. Может быть, мне нужно иметь очередь между издателем и очередью. Но, похоже, это не работает с обменом «темами», когда сообщения направляются ключом.

Любая идея, как мне поступить. Я использую связывание Perl Net :: RabbitMQ (не должно иметь значения) и RabbitMQ 2.2.0.

Ответы [ 2 ]

63 голосов
/ 27 мая 2011

Вам нужна длительная очередь для хранения сообщений, если нет доступных подключенных потребителей для обработки сообщений во время их публикации.

Обмен не хранит сообщения, но очередь может.Заблуждение заключается в том, что биржи могут быть помечены как «надежные», но все, что на самом деле означает, что биржа сама по себе все еще будет там, если вы перезапустите своего брокера, но это означает , а не что любые сообщения, отправленные на этот обмен, автоматически сохраняются.

Учитывая это, есть два варианта:

  1. Выполнить административный шаг перед тем, как вы начнете издателейсоздать очередь (ы) самостоятельно.Вы можете использовать веб-интерфейс или инструменты командной строки, чтобы сделать это.Убедитесь, что вы создаете его как долговременную очередь, так что он будет хранить любые сообщения, которые направляются на него, даже если нет активных потребителей.
  2. Предполагается, что ваши потребители закодированы, чтобы всегда объявлять (и, следовательно, автоматически создавать)их обмены и очереди при запуске (и что они объявляют их надежными), просто запустите всех своих потребителей хотя бы один раз , прежде чем запускать каких-либо издателей.Это гарантирует, что все ваши очереди будут созданы правильно.Затем вы можете отключить потребителей до тех пор, пока они действительно не понадобятся, потому что очереди будут постоянно хранить любые будущие сообщения, перенаправленные им.

Я бы пошел на # 1.Может быть не так много шагов, которые нужно выполнить, и вы всегда можете написать сценарий, необходимый для повторения.Кроме того, если все ваши потребители собираются извлечь из одной и той же очереди (а не иметь выделенную очередь для каждой), это действительно минимальная часть административных издержек.

Очереди - это то, что нужно правильно контролировать и контролировать.В противном случае вы можете получить мошенников, которые будут объявлять длительные очереди, используя их в течение нескольких минут, но никогда больше.Вскоре после этого у вас будет постоянно растущая очередь, в которой ничто не уменьшит ее размер, и приближающийся апокалипсис брокера.

18 голосов
/ 27 июня 2014

Как упомянул Брайан, обмен не хранит сообщения и в основном отвечает за маршрутизацию сообщений либо в другой обмен, либо в очередь. Если обмен не привязан к очереди, все сообщения, отправленные на этот обмен, будут «потеряны»

Вам не нужно объявлять фиксированные клиентские очереди в скрипте издателя, поскольку это может не масштабироваться. Ваши издатели могут динамически создавать очереди и маршрутизировать их с помощью привязки обмена к обмену.

RabbitMQ поддерживает привязки обмена к обмену, что обеспечивает гибкость топологии, развязку и другие преимущества. Вы можете прочитать больше здесь на RabbitMQ Exchange для биржевых привязок [AMPQ]

Связывание RabbitMQ Exchange to Exchange

Example Topology

Пример кода Python для создания привязки обмена к обмену с сохранением, если в очереди нет ни одного потребителя.

#!/usr/bin/env python
import pika
import sys


connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()


#Declares the entry exchange to be used by all producers to send messages. Could be external producers as well
channel.exchange_declare(exchange='data_gateway',
exchange_type='fanout',
durable=True,
auto_delete=False)

#Declares the processing exchange to be used.Routes messages to various queues. For internal use only
channel.exchange_declare(exchange='data_distributor',
exchange_type='topic',
durable=True,
auto_delete=False)

#Binds the external/producer facing exchange to the internal exchange
channel.exchange_bind(destination='data_distributor',source='data_gateway')

##Create Durable Queues binded to the data_distributor exchange
channel.queue_declare(queue='trade_db',durable=True)
channel.queue_declare(queue='trade_stream_service',durable=True)
channel.queue_declare(queue='ticker_db',durable=True)
channel.queue_declare(queue='ticker_stream_service',durable=True)
channel.queue_declare(queue='orderbook_db',durable=True)
channel.queue_declare(queue='orderbook_stream_service',durable=True)

#Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present
channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade')
channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')
...