Изменение порядка сообщений в RabbitMQ - PullRequest
9 голосов
/ 11 февраля 2011

RabbitMQ отмечает все поля для проекта, который я планирую, за исключением одного. Я бы хотел, чтобы разные работники слушали очередь, и важно, чтобы они сначала обрабатывали самые новые сообщения (т.е. последний порядковый номер) (LIFO).

Мое приложение таково, что новые сообщения в значительной степени устарели. Если у вас есть свободные рабочие, вы все равно можете обрабатывать старые сообщения, но важно, чтобы новые сообщения выполнялись первыми.

После обхода различных форумов и тому подобного я вижу только одно решение - клиент должен обработать сообщение первым:

  • потреблять все сообщения
  • переупорядочить их по порядковому номеру
  • повторно отправить в очередь
  • потребляет первое сообщение

Безобразно и проблематично, если клиент умирает на полпути. Но у кого-нибудь здесь есть лучшее решение.

Мои исследования основаны (частично) на:

Примечание: ожидаемый трафик сообщений будет примерно в диапазоне 1 мсг / час для одних очередей и 100 м / мин для других. Так что ничего звездного.

Ответы [ 2 ]

3 голосов
/ 16 февраля 2011

Поскольку ответа нет, наверное, я хорошо выполнил домашнее задание;)

В любом случае, после обсуждения требований с другими заинтересованными сторонами было решено на время отказаться от требования LIFO. Мы можем беспокоиться об этом, когда дело доходит до этого.

Решение, которое мы, вероятно, в конечном итоге примем, состоит в том, чтобы работник открыл вторую очередь, которую мастер может использовать, чтобы дать работнику знать, какие задания игнорировать + предоставить дополнительную информацию управления / мониторинга (которая, как нам кажется, нам понадобится) в любом случае).

RabbitMQ, реализующий спецификацию AMQP 1.0, также может помочь здесь.

Так что я отмечу этот вопрос как ответ на данный момент. Кто-то еще может свободно добавлять или улучшать.

1 голос
/ 20 февраля 2011

Одной из возможностей может быть использование basic.get в цикле и ожидание, пока ответ basic-ok.message-count станет нулевым (отбрасывая все остальные сообщения):

while (<get ok> = <call basic.get>) {
  if (<get ok>.message-count == 0) {
    // Now <get ok> is the most recent message on this queue
    break;
  } else if (<is get-empty>) {
    // Someone else got it
  }
}

Конечно, вы получитенастроить шаблоны маршрутизации сообщений на брокере таким образом, чтобы один потребитель, выбрасывающий сообщения, не связывался с другим.Постарайтесь избегать очередей сообщений, поскольку они будут помещаться в очередь на вершине стека, делая их похожими на самые последние.

...