Чтобы быть уверенным в параллелизме, одна и та же группа работает в нескольких очередях (FIFO) - PullRequest
0 голосов
/ 25 мая 2018

У меня вопрос о параллельности нескольких потребителей.Я хочу отправить работы на rabbitmq, которые поступают из веб-запроса в распределенные очереди.Я просто хочу быть уверен в порядке работ в нескольких очередях (FIFO).Поскольку этот запрос исходит от разных пользователей, необходимо заказывать / обрабатывать все пользовательские запросы.

Я обнаружил эту функцию с разными именами в Azure ServiceBus и ActiveMQ, группирующих сообщения.

Есть ли способ сделатьэто в симпатичном RabbitMQ?

Я хочу гарантировать, что запросы клиентов должны заказываться друг у друга.У каждого клиента может быть несколько запросов, но эти запросы для этого клиента должны быть обработаны по порядку.Я хочу быстро обрабатывать входящие запросы с использованием нескольких потребителей на разных узлах.Например, разные клиенты от 1 до 1000 отправляют запросы более чем на 1 миллион.Если я помещу этот огромный запрос только в одну очередь, это займет много времени.Поэтому я хочу разделить эту загрузку процесса между n (5) узлами.Для клиента X запросы должны быть в той же последовательности для обработки

Ответы [ 3 ]

0 голосов
/ 25 мая 2018

При работе с системами, основанными на событиях, и особенно при использовании нескольких производителей и / или потребителей, важно смириться с тем фактом, что обычно не существует такого понятия, как гарантированный порядок событий.И чтобы получить надежную систему, также целесообразно спроектировать систему так, чтобы обработчики сообщений были идемпотентными;они должны терпеть получение одного и того же сообщения дважды (или более).

Существует множество способов, которые могут (и фактически должны позволять) мешать порядку;

  • Производители могут доставлять сообщения в несколько ином темпе
  • Один производитель может пропустить квитанцию ​​(из-за пропущенного пакета) и повторно отправит сообщение
  • Один потребитель может получить и обработать сообщение,но подтверждение на обратном пути теряется, поэтому сообщение доставляется дважды (другому потребителю).
  • Некоторые другие службы, от которых зависят ваши обработчики, могут быть недоступны, поэтому вам придется отклонить сообщение.

При этом существует одна схема, которую системы сервисных шин, такие как NServicebus , используют для принудительного использования сообщений заказа.Существуют некоторые требования:

  • Вам потребуется централизованное хранилище (например, sql-сервер или хранилище документов), которое позволяет выполнять условные обновления;например, вы хотите иметь возможность хранить порядковый номер последнего обработанного сообщения (или как далеко вы продвинулись в процессе), но только , если уже сохраненная последовательность / ход выполнения является правильной / ожидаемой,Хранение идентификатора пользователя и прогресса даже для миллионов клиентов должно быть очень простой операцией для большинства баз данных.
  • Вы должны убедиться, что очередь настроена на dead-letter-queue / exchange для повторных попыток, а затем снова установите исходную очередь в качестве очереди недоставленных писем.
  • Вы устанавливаете TTL (например, 30 секунд) в очереди на повторную попытку / недоставленное письмо.Таким образом, сообщения, которые появляются в очереди недоставленных писем, будут автоматически возвращаться в исходную очередь через некоторое время.
  • При обработке сообщений вы проверяете свое хранилище / базу данных, если вы находитесь в правильном состоянии дляобработать сообщение (т.е. необходимые предыдущие шаги уже выполнены).
    • Если вы можете справиться с этим, вы делаете и обновляете хранилище (условно!).
    • Если не - вы убираете сообщение, чтобы оно было брошеноочередь мертвых писем.По сути, вы говорите: «Нет, я не могу обработать это сообщение, возможно, в очереди есть еще какое-то сообщение, которое должно быть обработано первым».

Таким образом, счастливый-path - обрабатывать большое количество сообщений в правильном порядке.Но если что-то случится и вы получите сообщение вне группы, вы бросите его в очередь повторов (очередь недоставленных писем), и Кролик удостоверится, что оно вернется в очередь для повторной попытки на более позднем этапе.,Но только с задержкой.

Прелесть этого в том, что вы способны справиться с большинством ситуаций, которые могут помешать обработке сообщения (неупорядоченные сообщения, отключение зависимых служб, отключение вашего обработчика)в середине обработки сообщения) точно таким же образом;отклоняя сообщение и позволяя вашей инфраструктуре (Кролику) позаботиться о том, чтобы через некоторое время его повторили.

0 голосов
/ 04 июня 2018

RabbitMQ Consisten Hash Exchange Тип

Мы используем RabbitMQ, и мы нашли плагин.Он использует алгоритм согласованного хеширования для рассылки сообщений по согласованным ключам.

Для получения дополнительной информации о согласованном хешировании;

https://en.wikipedia.org/wiki/Consistent_hashing

https://www.youtube.com/watch?v=viaNG1zyx1g

Вы можете найти этот плагин с веб-страницы rabbitmq

Плагин: rabbitmq_consistent_hash_exchange

https://www.rabbitmq.com/plugins.html

0 голосов
/ 25 мая 2018

(при условии, что OP спрашивает о таких вещах, как ActiveMQs) группировка сообщений:)

Это в настоящее время не встроено в RabbitMQ AFAIK (по состоянию на этот ответ не было в 2013 году)1004 *), и я не знаю об этом сейчас (хотя в последнее время я не успеваю).

Однако модель обмена и очередей RabbitMQ очень гибкая - обмены и очереди могут быть легко созданы динамически (это может быть сделано в других системах обмена сообщениями, но, например, если вы прочитаете документацию ActiveMQ или документацию Red Hat AMQ, вы найдете все примеров в руководствах пользователя, использующих предварительно объявленные очереди в файлах конфигурациизагружается при запуске системы - за исключением RPC-подобных запросов / ответов).

Кроме того, в RabbitMQ очень легко для потребителя (т. е. потока, потребляющего сообщения) потреблять из нескольких очередей.

Чтобы вы могли построить поверх RabbitMQ систему, в которой вы получили желаемую семантику группировки.

Один из способов - создать динамическую очередь.es: при первом просмотре заказа клиента или создании новой группы заказов клиента будет создана очередь с уникальным именем для всех сообщений этой группы - это имя очереди будет передано (через другую очередь) потребителю, который является единственной цельюдолжен был балансировать нагрузку среди других потребителей, которые отвечали за обработку групп заказов клиентов.Т.е. балансировщик нагрузки извлекает из своей очереди сообщение «новая группа с именем очереди XYZ» и находит в пуле потребителя группы заказов потребителя, который может принять эту нагрузку, и передает ему сообщение «начать прослушивание».в XYZ ".

Еще один способ сделать это - паб / подпрограмма и маршрутизация тем - каждая группа заказов клиентов получит уникальную тему - и действуйте, как указано выше.

...