Как вы обрабатываете сообщения параллельно, обеспечивая при этом FIFO для каждой сущности? - PullRequest
4 голосов
/ 15 апреля 2011

Допустим, у вас есть сущность, скажем, «Персона» в вашей системе, и вы хотите обрабатывать события, которые изменяют различные сущности Персоны. Важно что:

  • События для одного и того же лица обрабатываются в порядке FIFO
  • Потоки событий с несколькими людьми обрабатываются параллельно разными потоками / процессами

У нас есть реализация, которая решает эту проблему, используя общую базу данных и блокировки. Потоки конкурируют за получение блокировки для Person, а затем обрабатывают события по порядку после получения блокировки. Мы хотели бы перейти в очередь сообщений, чтобы избежать опроса и блокировки, что, по нашему мнению, уменьшит нагрузку на БД и упростит реализацию кода потребителя.

Я провел некоторые исследования в ActiveMQ, RabbitMQ и HornetQ, но не вижу очевидного способа реализовать это.

ActiveMQ поддерживает групповые символы подписки потребителя, но я не вижу способа ограничить параллелизм в каждой очереди до 1. Если бы я мог это сделать, то решение было бы простым:

  • Каким-то образом указать брокеру разрешить параллелизм 1 для всех очередей, начиная с: /queue/person.
  • Издатель записывает событие в очередь, используя Person ID в имени очереди. например: /queue/person.20
  • Потребители подписываются на очередь, используя подстановочные знаки: /queue/person.>
  • Каждый потребитель получит сообщения для очередей разных людей. Если используются все очереди пользователей, некоторые потребители могут бездействовать, что нормально
  • После обработки сообщения потребитель отправляет ACK, который сообщает посреднику, что он завершил работу с этим сообщением, и позволяет отправить другое сообщение для этой очереди Person другому потребителю (возможно, тому же)

ActiveMQ подошел вплотную: вы можете использовать групповые подписки и включить «эксклюзивного потребителя», но эта комбинация приводит к тому, что один потребитель получает все сообщения, отправленные во все соответствующие очереди, уменьшая ваш параллелизм до 1 для всех людей. Я чувствую, что упускаю что-то очевидное.

Вопросы:

  • Есть ли способ реализовать описанный выше подход с какой-либо крупной реализацией очереди сообщений? Мы довольно открыты для вариантов. Единственное требование - чтобы он работал на Linux.
  • Есть ли другой способ решения общей проблемы, который я не рассматриваю?

Спасибо!

Ответы [ 3 ]

3 голосов
/ 15 апреля 2011

Похоже, JMSXGroupID - это то, что я ищу. Из документов ActiveMQ:

http://activemq.apache.org/message-groups.html

Их пример использования с ценами на акции - именно то, что мне нужно. Меня беспокоит только то, что произойдет, если один потребитель умрет. Надеемся, что брокер обнаружит это и выберет другого потребителя, который будет связан с этим идентификатором группы.

1 голос
/ 15 апреля 2011

Один общий способ решения этой проблемы (если я правильно понял вашу проблему) - это ввести какое-то уникальное свойство для Person (скажем, идентификатор уровня базы данных Person) и использовать хеш этого свойства в качестве индекса очереди FIFO, чтобы поместить этоPerson in.
Поскольку хеш этого свойства может быть громоздким (вы не можете позволить себе 2 ^ 32 очередей / потоков), используйте только N наименее значимых битов этого хеша.Каждая очередь FIFO должна иметь выделенного работника, который будет работать над ней - вуаля, ваши требования удовлетворены!

У этого подхода есть один недостаток - ваши сотрудники должны иметь хорошо распределенные идентификаторы, чтобы все очереди работали с большей -или менее равной нагрузки.Если вы не можете этого гарантировать, рассмотрите возможность использования набора очередей в циклическом порядке и проследите, какие люди обрабатываются сейчас, чтобы обеспечить последовательную обработку для одного и того же человека.

0 голосов
/ 15 апреля 2011

Если у вас уже есть система, которая допускает общие блокировки, почему бы не иметь блокировку для каждой очереди, которую потребители должны получить перед чтением из очереди?

...