Как выборочно удалять сообщения из очереди AMQP (RabbitMQ)? - PullRequest
7 голосов
/ 08 августа 2010

Я хотел бы выборочно удалять сообщения из очереди AMQP, даже не читая их.

Сценарий выглядит следующим образом:

Отправляющая сторона хочет истечь сообщения типа X на основефакт, что поступила новая информация типа X.Поскольку вполне вероятно, что подписчик еще не принял последнее сообщение типа X, издатель должен просто удалить предыдущие сообщения типа X и поместить в очередь самое новое.Вся операция должна быть прозрачной для абонента - на самом деле он должен использовать что-то простое, например STOMP, для получения сообщений.

Как это сделать с помощью AMQP?Или может быть удобнее в другом протоколе обмена сообщениями?

Я бы хотел избежать сложной инфраструктуры.Весь необходимый обмен сообщениями так же прост, как указано выше: одна очередь, один подписчик, один издатель, но издатель должен иметь возможность произвольно удалять сообщения по заданным критериям.

Клиент издателя будет использовать Rubyно на самом деле я разбираюсь с любым языком, как только узнаю, как это сделать в протоколе.

Ответы [ 4 ]

9 голосов
/ 01 июня 2011

Вам не нужна очередь сообщений, вам нужна база данных значений ключей.Например, вы можете использовать Redis или Tokyo Tyrant, чтобы получить простую доступную по сети базу данных значений ключей.Или просто используйте memcache.

Каждый тип сообщения является ключом.Когда вы пишете новое сообщение с тем же ключом, оно перезаписывает предыдущее значение, поэтому читатель этой базы данных никогда не сможет получить устаревшую информацию.

На данный момент вам нужна только очередь сообщений дляустановите порядок чтения ключей, если это важно.В противном случае просто постоянно сканируйте базу данных.Если вы постоянно сканируете базу данных, лучше расположить базу рядом со считывателями, чтобы уменьшить сетевой трафик.

Я бы, вероятно, сделал что-то подобноекоторые содержат typecode, lastUpdated Таким образом, читатель может сравнить последние обновленные для этого ключа с тем, который они в последний раз читали из базы данных, и пропустить чтение, потому что они уже обновлены.с AMQP, затем используйте RabbitMQ и пользовательский тип обмена, в частности, обмен кэшем Last Value.Пример кода здесь https://github.com/squaremo/rabbitmq-lvc-plugin

6 голосов
/ 09 августа 2010

В настоящее время вы не можете делать это в RabbitMQ (или, в более общем случае, в AMQP) автоматически. Но вот простой способ.

Допустим, вы хотите отправить три типа сообщений: Xs, Ys и Zs. Если я правильно понимаю ваш вопрос, когда приходит сообщение X, вы хотите, чтобы брокер забыл все остальные сообщения X, которые не были доставлены.

Это довольно легко сделать в RabbitMQ:

  • производитель объявляет три очереди: X, Y и Z (они автоматически привязываются к обмену по умолчанию со своими именами в качестве ключей маршрутизации, а это именно то, что нам нужно),
  • при публикации сообщения производитель сначала удаляет соответствующую очередь (поэтому, если он публикует сообщение X, он сначала очищает очередь X); это эффективно удаляет устаревшие сообщения,
  • потребитель просто потребляет из очереди, которую хочет (X для сообщений X, Y для сообщений Y и т. Д.); с его точки зрения, он просто должен сделать basic.get, чтобы получить следующее соответствующее сообщение.

Это подразумевает состояние гонки, когда два производителя отправляют сообщения одного типа примерно в одно и то же время. В результате очередь может иметь два (или более) сообщения одновременно, но количество сообщений ограничено числом производителей, а лишние сообщения удаляются при следующей публикации. это не должно быть большой проблемой.

Подводя итог, у этого решения есть только один дополнительный шаг от оптимального решения, а именно, очередь очистки X перед публикацией сообщения типа X.

Если вам нужна помощь в настройке этой конфигурации, идеальным местом для консультации является список рассылки rabbitmq-обсуждения.

2 голосов
/ 22 апреля 2016

Кажется, что это работает и из веб-интерфейса RabbitMQ, если вы просто хотите удалить первые n сообщений из очереди

  • выберите очередь на вкладке "Очереди", прокрутите вниз до раздела "Получить сообщения "
  • установить параметр" Requeue = No "и количество сообщений, которые вы хотите удалить из очереди
  • нажать кнопку" Получить сообщения "
2 голосов
/ 23 ноября 2015

Этот вопрос имеет высокую наглядность благодаря названию.Проходя через описание, остановимся на более конкретном сценарии.Поэтому для тех пользователей, которые хотят действительно удалить следующее (помните FIFO) сообщение из очереди, вы можете использовать rabbitmqadmin и выполнить следующую команду:

rabbitmqadmin get queue=queuename requeue=false count=1

Эта командапо существу потребляет сообщение и ничего не делает.Полная команда с флагом для резервного копирования сообщений может выглядеть так, как показано ниже.Обязательно добавьте любые другие параметры согласно вашему требованию.

sudo python rabbitmqadmin -V virtualhostname -u user -p pass get queue=queuename requeue=false count=1 payload_file=~/origmsg

...