Блокирует и получает пакетные сообщения с RabbitMq - PullRequest
9 голосов
/ 24 января 2012

Я пытаюсь использовать RabbitMq более нетрадиционным способом (хотя в этот момент я могу выбрать любую другую реализацию очереди сообщений, если это необходимо).Вместо того, чтобы отправлять push-сообщения Rabbit моим потребителям, потребитель подключается к очереди и получает пакет из N сообщений (в течение которых он потребляет некоторые, а возможно, отклоняет некоторые), после чего он переходит в другую очередь и так далее.Это сделано для избыточности.Если некоторые потребители терпят крах, все сообщения гарантированно потребляются другим пользователем.

Проблема в том, что у меня несколько потребителей, и я не хочу, чтобы они конкурировали в одной и той же очереди.Есть ли способ гарантировать блокировку очереди?Если нет, могу ли я по крайней мере удостовериться, что если два потребителя подключены к одной и той же очереди, они не читают одно и то же сообщение?Транзакции могут в некоторой степени помочь мне, но я слышал, что они будут удалены из RabbitMQ.

Также приветствуются другие архитектурные предложения.

Спасибо!

РЕДАКТИРОВАТЬ: Как указано в комментарии, есть особенность в том, как мне нужно обрабатывать сообщения.Они имеют смысл только в группах, и существует высокая вероятность того, что связанные сообщения объединяются в очередь.Например, если я получу пакет из 100 сообщений, существует высокая вероятность, что я смогу что-то сделать с сообщениями 1-3, 4-5,6-10 и т. Д. Если мне не удастся найти группу для некоторых сообщений, яПовторно отправлю их в очередь.WorkQueue не будет работать, потому что он будет рассылать сообщения из одной группы нескольким работникам, которые не будут знать, что с ними делать.

Ответы [ 3 ]

8 голосов
/ 29 января 2012

Вы уже видели эту бесплатную онлайн-книгу по Шаблонам корпоративной интеграции ?

Похоже, вам действительно нужен рабочий процесс, в котором у вас есть компонент-дозатор, прежде чем сообщения попадут к вашим работникам. С RabbitMQ есть два способа сделать это. Либо используйте тип обмена (и формат сообщения), который может выполнить пакетирование для вас, либо иметь одну очередь, и рабочий, который сортирует пакеты и помещает каждый пакет в свою очередь. Дозатор, вероятно, также должен отправить сообщение «готово к отправке» в очередь управления, чтобы рабочий мог обнаружить существование новой очереди. После обработки пакета рабочий может удалить очередь пакета.

Если у вас есть контроль над форматом сообщения, вы можете получить RabbitMQ для неявного выполнения пакетирования несколькими способами. При обмене темами вы можете убедиться, что ключ маршрутизации в каждом сообщении имеет формат work.batchid.something, а затем работник, который узнает о существовании пакетного xxyzz, будет использовать только ключ привязки, такой как # .xxyzz. #, Чтобы потреблять эти сообщения. Переиздание не требуется.

Другой способ - включить идентификатор пакета в заголовок и использовать более новый тип обмена заголовками. Конечно, вы также можете реализовать свои собственные типы обмена, если вы хотите написать небольшое количество кода Erlang.

Тем не менее, я рекомендую проверить книгу, потому что она дает лучший обзор архитектуры обмена сообщениями, чем типичная концепция рабочей очереди, с которой большинство людей начинают.

2 голосов
/ 25 января 2012

Пусть ваши потребители вытащат из одной очереди.Они гарантированно не будут обмениваться сообщениями (Rabbit проведет циклический перебор сообщений среди подключенных в настоящее время потребителей), и он будет сильно оптимизирован для этого точного шаблона использования.

Он готов к использованию, из коробки,В документах RabbitMQ это называется моделью Work Queue .Одна очередь, несколько потребителей, и никто из них не делится чем-либо.Похоже, что вам нужно.

0 голосов
/ 22 ноября 2014

Вы можете установить счетчик предварительной выборки на уровне канала / потребителя, чтобы получать сообщения в пакетах. Для повторной отправки сообщений следует использовать метод AMQP basic.reject, и эти сообщения можно выбрать для добавления в очередь или для пересылки в очередь недоставленных сообщений. Несколько потребителей, пытающихся извлечь сообщения из одной и той же очереди, не являются проблемой, поскольку метод AMQP basic.get будет синхронизирован для обработки одновременных потребителей.

https://groups.google.com/forum/#!topic/rabbitmq-users/hJ8f5du-GCA

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...