Синхронизация рабочих компонентов, управляемых сообщениями JMS - PullRequest
1 голос
/ 07 мая 2009

Мы только начинаем строить нашу архитектуру JMS и имеем следующую базовую настройку:

  1. GLassfish v2.1
  2. MDB прослушивание темы через фабрику TopicConnection (все на локальном сервере)

Теперь MDB порождает рабочий поток, когда приходит новое сообщение, и хотя у нас есть порядок доставки сообщений, нам нужен механизм синхронизации, чтобы потоки проверяли определенное условие перед одновременной обработкой запроса.

Есть ли способ для этих потоков обмениваться данными? Или есть какие-то другие механизмы (кроме блокировки таблиц / строк в базе данных), которые мы можем использовать для синхронизации?

Заранее спасибо.


Чтобы уточнить, я не создаю свои собственные темы. Как все правильно отметили, контейнер делает это для меня. Позвольте мне помочь объяснить мою дилемму на примере.

- Сообщение A приходит в момент времени t = 0, что «создает» идентификатор данных 1

- Сообщение B достигает t = 0,1, что «обновляет» идентификатор данных 1

Теперь, предполагая, что контейнер порождает 2 рабочих для обработки A & B, и что для создания данных требуется гораздо больше времени, чем для обновления, обновление будет выполняться раньше и не будет иметь никакого эффекта.

Чтобы быть понятнее,

-При обработке сообщения B я бы искал данные с идентификатором 1 при t = 1 (не нашел бы его и, таким образом, закончил, ничего не делая).

-Данный идентификатор 1 будет создан при обработке сообщения А при t = 2.

Ответы [ 6 ]

6 голосов
/ 07 мая 2009

Предупреждение педанта! Я из тех парней, которые читают реальные спецификации технологий.

Чтение спецификации EJB версии 3.0, раздел 21.1.2 (Ограничения программирования) запрещает использование потоков в вашем коде. Вот язык и обоснование ...

Корпоративный компонент не должен пытаться управлять потоками. Боб предприятия не должен пытаться начать, остановить, приостановить или возобновить обсуждение или изменить приоритет потока или имя. Боб предприятия не должен пытаться управлять группами потоков.

Эти функции зарезервированы для контейнера EJB. Разрешение предприятия бин для управления потоками уменьшится способность контейнера правильно управлять средой выполнения.

Так что, если вы сделаете то, что говорите, полиция EJB постучится в вашу дверь посреди ночи и заберет вас. Или ваше приложение может работать со сбоями, и поставщик будет смеяться, когда вы жалуетесь. Или ничего плохого не произойдет.

Но, как говорит Даффимо, зачем это делать? Если вам нужна масштабируемость, предлагаемая множеством потоков, можете ли вы настроить ее для своего MDB? Смысл EJB в том, чтобы справляться с такими вещами для вас.

4 голосов
/ 07 мая 2009

Я не понимаю, почему MDB должен порождать рабочий поток. Есть пул потоков, связанный с прослушивателями сообщений в JMS. Это нить, которая должна делать работу.

Спецификация EJB гласит, что в ваших бобах не появляется нить. Контейнер обрабатывает потоки. Это также включает и MDB.

Слушатель должен обрабатывать сообщение, которое он удаляет из очереди. Необходимые данные должны быть в сообщении. Что нужно для обмена?

Я думаю, что ваш подход идет вразрез с рекомендованными практиками EJB.

3 голосов
/ 05 мая 2010

Настоящая проблема заключается в том, что сервер приложений обычно порождает упомянутых работников самостоятельно. Хотя JMS гарантирует, что сообщения потребляются в том же порядке, в каком они созданы, по крайней мере, в пределах одного производителя, спецификация MDB явно заявляет, что порядок не сохраняется (из-за упомянутых работников). См. Раздел 5.4.11 JSR-000220 Enterprise JavaBeans 3.0, финальная версия (ejbcore) .

Нет портативного и 100% надежного способа обойти это. Из-за характера рабочей гонки введены условия, которые не могут контролироваться.

К счастью, у большинства серверов приложений есть способы, хотя и собственные, и несовместимые, для настройки количества рабочих.

Для JBoss и ActiveMQ это работает:

@PoolClass(value = org.jboss.ejb3.StrictMaxPool.class, maxSize = 1)
@MessageDriven(activationConfig = {
    @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
    @ActivationConfigProperty(propertyName = "maxSessions", propertyValue = "1"),
    @ActivationConfigProperty(propertyName = "destination", propertyValue = "TEST.FOO")
})
@ResourceAdapter("activemq-ra.rar")
public class NewMessageBean implements MessageListener { ... }

В этом случае «maxSessions» - это количество работников. Это может отличаться от других провайдеров JMS, но это должно указывать в правильном направлении.

2 голосов
/ 07 мая 2009

Как упомянуто выше, среда JMS обрабатывает проблемы планирования, такие как диспетчеризация потоков. Все, что вы делаете в этом, будет не только хуже, чем поведение по умолчанию, но, вероятно, серьезно ограничит функциональность вашего JMS.

Более сложные обработчики JMS предназначены для работы на нескольких узлах (= серверах), поэтому любое решение с общей памятью ограничит вас одной JVM на одном узле, что было бы жаль, поскольку большое преимущество JMS - это масштабируемость.

Возможное решение JMSy состояло бы в том, чтобы очередь «cookie» представляла собой одно фиктивное сообщение «cookie» для синхронизации действий. Когда пришло время для процесса выполнения спорной деятельности он «получает с ожиданием» в одном сообщении от «куков» очереди, когда спорная работа завершена он помещает печенье обратно в очереди. Магия JMS справится практически со всеми блокировками, ожиданием и исправлением ошибок.

0 голосов
/ 15 декабря 2009

Похоже, что вопрос полностью изменился в природе после того, как все уже ответили на него. Я добавлю очень запоздалый ответ для потомков; это предполагает, что есть некоторый тип идентификатора сообщения, который может использоваться для заказа. Вы говорите: «У нас есть в порядке доставки сообщений», но вы точно не говорите, как это достигается.

... Поскольку вы пояснили, что не запускаете свои собственные потоки, у вас в основном есть условие гонки между «созданием» данных @ ID = 1 и его последующим «обновлением». Я предполагаю, что вы блокируете данные @ ID = 1, пока они создаются и / или обновляются. Итак, есть две возможности:

  1. сначала появляется сообщение «создать данные»: (1) идентификатор блокировки = 1, (2) создать данные. (3) отпустите замок. (4) применить ожидающее «обновление».
  2. «обновление» приходит до «создания»: (1) идентификатор блокировки = 1, (2) создать недостающие данные (т.е. выполнить «upsert»: вставить данные, даже если их там нет). (3) отпустите замок. (4) игнорировать ожидающее сообщение «создать данные» (порядковый номер сообщения «создать» меньше, чем «обновления»)
    • Если отправителю сообщения разрешено отправлять обновления и вставлять одновременно (без запроса / ответа), тогда они действительно должны иметь своего рода порядковый номер. Если это так, то когда появится «insert», его порядковый номер будет меньше текущего значения в этой строке, поэтому его можно игнорировать. Или, если тип сообщения «создать» отличается от типа сообщения «обновление», то «создает» всегда можно игнорировать, если данные уже существуют.

Я думаю, у вас есть вопрос, как синхронизировать данные. По сути, потоки совместно используют объекты и создают мьютекс (взаимное исключение), чтобы позволить одному потоку доступ к данным, вызывая блокировку другого потока. Это можно сделать просто с помощью низкоуровневых средств синхронизации Java (ключевое слово «synchronized») или встроенных классов, которые помогают с этим (http://java.sun.com/docs/books/tutorial/essential/concurrency/highlevel.html).

0 голосов
/ 07 мая 2009

Как уже упоминалось несколькими людьми, вы не должны создавать свои собственные потоки в MDB (или любом другом типе EJB или сервлета в этом отношении).

Многие EJB-контейнеры фактически не позволяют вам создавать и запускать потоки. Хотя есть один безопасный способ сделать это, используя WorkManager из спецификации commonj, хотя я не вижу причин для этого в данном конкретном случае, поскольку MDB уже работает в своем собственном «рабочем потоке».

См. информацию о порождающих потоках здесь для получения дополнительной информации о том, почему вы не должны создавать потоки на сервере Java EE, и о том, как сделать это безопасно, когда вам нужно.

...