Допустим, у вас есть сущность, скажем, «Персона» в вашей системе, и вы хотите обрабатывать события, которые изменяют различные сущности Персоны. Важно что:
- События для одного и того же лица обрабатываются в порядке FIFO
- Потоки событий с несколькими людьми обрабатываются параллельно разными потоками / процессами
У нас есть реализация, которая решает эту проблему, используя общую базу данных и блокировки. Потоки конкурируют за получение блокировки для Person, а затем обрабатывают события по порядку после получения блокировки. Мы хотели бы перейти в очередь сообщений, чтобы избежать опроса и блокировки, что, по нашему мнению, уменьшит нагрузку на БД и упростит реализацию кода потребителя.
Я провел некоторые исследования в ActiveMQ, RabbitMQ и HornetQ, но не вижу очевидного способа реализовать это.
ActiveMQ поддерживает групповые символы подписки потребителя, но я не вижу способа ограничить параллелизм в каждой очереди до 1. Если бы я мог это сделать, то решение было бы простым:
- Каким-то образом указать брокеру разрешить параллелизм 1 для всех очередей, начиная с: /queue/person.
- Издатель записывает событие в очередь, используя Person ID в имени очереди. например: /queue/person.20
- Потребители подписываются на очередь, используя подстановочные знаки: /queue/person.>
- Каждый потребитель получит сообщения для очередей разных людей. Если используются все очереди пользователей, некоторые потребители могут бездействовать, что нормально
- После обработки сообщения потребитель отправляет ACK, который сообщает посреднику, что он завершил работу с этим сообщением, и позволяет отправить другое сообщение для этой очереди Person другому потребителю (возможно, тому же)
ActiveMQ подошел вплотную: вы можете использовать групповые подписки и включить «эксклюзивного потребителя», но эта комбинация приводит к тому, что один потребитель получает все сообщения, отправленные во все соответствующие очереди, уменьшая ваш параллелизм до 1 для всех людей. Я чувствую, что упускаю что-то очевидное.
Вопросы:
- Есть ли способ реализовать описанный выше подход с какой-либо крупной реализацией очереди сообщений? Мы довольно открыты для вариантов. Единственное требование - чтобы он работал на Linux.
- Есть ли другой способ решения общей проблемы, который я не рассматриваю?
Спасибо!