Временное отключение доставки сообщения JMS на основе свойства сообщения - PullRequest
1 голос
/ 09 октября 2011

У меня есть требование, о котором я сейчас не знаю, возможно ли это вообще.Я хотел бы временно отключить работу JMS-сообщения, если оно содержит указанное свойство.В настоящее время я использую HornetQ в качестве поставщика сообщений.

Давайте создадим пример:

Очередь содержит следующие три записи:

{1, "foo", "A_CATEGORY"}
{2, "bar", "B_CATEGORY"}
{9, "bof", "A_CATEGORY"}

В определенный момент приложениедолжен быть в состоянии сообщить серверу сообщений HornetQ, что сообщения, принадлежащие B_CATEGORY, не должны доставляться в данный момент (например, потому что базовая база данных для объектов B_CATEGORY обновляется).Таким образом, сообщение с идентификатором 2 не будет доставлено в данный момент, в то время как 1 и 9 будут доставлены, поскольку они имеют другое значение для объекта категории.

Это должно происходить из кода Java без перезапускаприложение на всех.Возможно ли это вообще?

Спасибо за помощь!


Просто подумал об альтернативном подходе к дизайну для этой проблемы.Предположим, что первая очередь содержит сообщения со всеми видами категорий (кстати, невозможно создать очередь для каждой категории, так как их может быть много).Эта «нормальная» очередь обычно настраивается (например, без истечения срока действия, но с DLQ).

Теперь, если слушатель потребляет такое сообщение и видит, что он не может обрабатывать сообщения, принадлежащие определенной категории, он помещает его во вторую очередь.Эта очередь настроена с задержкой повторной доставки и временем истечения.Если сейчас установить достаточно большое время истечения (конечно, не из-за переполнения очереди), а время повторной доставки не слишком короткое, то это должно сработать, если нет решения для вышеуказанного вопроса.

Конечнонеобходимо рассчитать, сколько из этих записей в очереди может быть создано в течение времени, когда категория не может быть обработана.А также сколько времени может занять такая недоступность для категории, чтобы можно было соответственно скорректировать повторную доставку.

Ответы [ 2 ]

1 голос
/ 10 октября 2011

Насколько я могу судить, это невозможно для управляемых сообщениями компонентов.

Подобная функциональность достижима для стандартного потребителя JMS:

 MessageConsumer c = session.createConsumer(destination);
 while ( b-category-can-be-processed ) {
     Message m = c.receive();
     // process messages until b category is OK to be processed
 }

 c.close();

 // now create a different consumer with message selector ignoring "B_CATEGORY"
 MessageConsumer c1 = session.createConsumer(destination, "Category <> 'B_CATEGORY'");
 while ( b-is-locked ) {
     Message m = c1.receive();
     // process messages until b category is locked
 }

 c1.close();
 // go to start

В этом примере предполагается, что вы можете сказать, когда обрабатывать B снова, на основе полученных сообщений. Если нет, то вы можете возобновить обычную рутину через определенное время. В примере также представлен только один поток выполнения.

Изучая этот путь дальше, вы можете взглянуть на Spring * DefaultMessageListenerContainer & mdash; Весеннее сообщение, управляемое бобом. Это может сделать именно то, что я описал, но гораздо более продвинутым способом. Он может подаваться с помощью селектора сообщений, и он активен, вы можете изменить его в любое время. Он также обрабатывает сообщения в нескольких потоках, если вы установите concurrentConsumers выше 1.

Что касается вашего решения с перенаправлением сообщений в другую очередь, когда они не могут быть обработаны, обратите внимание, что оно генерирует дополнительный трафик; Вы хотите, чтобы все ваши сообщения были обработаны в конце концов, верно? Почему бы не оставить их там, где они есть, и просто забрать их в подходящее время? Вам не нужно будет оценивать задержку доставки, которая может быть трудной.

0 голосов
/ 10 октября 2011

Вы можете создать основную очередь (или подписку) с фильтром и остановить очередь, используя API управления. Или, если вы работаете встраиваемым, вы можете просто вызвать паузу в объекте очереди сервера.

Поскольку это будет очень настраиваемая функция, вы, возможно, сможете использовать ее встроенную или внести специальные изменения в своей собственной ветви.

...