RabbitMQ / AMQP: несколько очередей, один потребитель - PullRequest
0 голосов
/ 02 мая 2018

Я хочу создать получателя, который обрабатывает сообщения из множества переменных источников, которые подключаются или отключаются динамически.

Мне нужно, чтобы каждый потребитель располагал по приоритетам первые N сообщений каждого источника . Затем запустить несколько потребителей, чтобы улучшить скорость.

Я читал документы для Рабочие очереди , Маршрутизация и Темы и множество других документов без определения того, как это реализовать. Также я сделал несколько тестов без удачи.

Может кто-нибудь указать мне, как это сделать или где об этом прочитать?

- EDIT -

QueueA ----- A3 - A2 - A1-┐

ОчередьB ----- B3 - B2 - B1-┼ ------ Потребитель

* * 1 022 QueueC ----- С3 - С2 - С1-┘

Желаемый эффект состоит в том, что каждый потребитель получает первые сообщения из каждой очереди. Например: A1, B1, C1, A2, B2, C2, A3, B3, C3 и так далее. Если создается новая очередь (QueueD), потребитель начинает получать от нее сообщения таким же образом.

Заранее спасибо

1 Ответ

0 голосов
/ 05 мая 2018

Мне нужно, чтобы каждый потребитель располагал по приоритетам первые N сообщений каждого источника. Затем запустить несколько потребителей, чтобы улучшить скорость.

Все очереди сообщений, о которых я знаю, предоставляют гарантии упорядочения только внутри самой очереди (Kafka обеспечивает гарантию упорядочения не на уровне очереди, а внутри разделов в очередях). Однако здесь вы просите сериализовать несколько очередей. Что будет невозможно в контексте распределенной системы.

Почему? потому что, если в этих очередях более одного потребителя, сообщения будут доставляться каждому подключенному потребителю очереди в циклическом порядке.

Предполагая prefetch_count=1 и с двумя подключенными потребителями, произнесите первый набор доставленных сообщений следующим образом:

  • A1, B1 и C1 доставлено потребителю 1 (X)
  • A2, B2 & C2 доставлено потребителю 2 (Y)

Теперь в распределенной системе все асинхронно, и все может пойти не так. Например:

Если X подтвердит A1, A3 будет доставлено в X. Но если Y подтвердит A2 до X, A3 будет доставлено в Y.

Кто первым ответит, не находится под вашим контролем в распределенной системе. Рассмотрим следующие сценарии:

  • X, возможно, пришлось ждать ввода-вывода или задачи, связанной с процессором, а Y, возможно, повезло, что ему не пришлось ждать. Затем Y будет проходить через сообщения в очереди.
  • Или Y был убит (раздел) или n / w стал медленным, тогда X продолжит использовать очередь.

Я настоятельно рекомендую вам переосмыслить ваши требования и рассмотреть ваши ожидаемые гарантии в асинхронном контексте (иначе вы бы не рассматривали МоМ, не так ли?).


PS: возможно реализовать то, что вы просите, с некоторой логикой на стороне потребителя (с наценкой на производительность / пропускную способность).

  • Один потребитель должен подключиться ко всем очередям
  • дождаться сообщений из каждой очереди, прежде чем проверять сообщения.
  • После получения сообщения из каждой очереди сгруппируйте их как одно сообщение и опубликуйте в другой очереди (P).
  • Теперь многие потребители могут подписаться на P для обработки упорядоченной группы сообщений.

Я не советую это, но эй, это ваша система, которая остановит вас;)

...