Очередь - это простая концепция: несколько источников записывают в очередь, и один потребитель читает сообщения одно за другим в порядке их получения.Попытка ввести произвольный доступ запутывает концепцию и идет вразрез с намерением с очередью.
Если вы не можете изменить получателя для удаления или сортировки сообщений, то введите промежуточную очередь и компонент, управляемый сообщениями (MDB) для выполнения работы: MDB будет принимать сообщения в очереди Q
, отбрасывать определенные сообщения и переупорядочивать другие сообщения перед публикацией сообщений в очередь Q'
.
До:
Q -> orignal consumer
После:
Q -> your filtering and sorting MDB -> Q' -> original consumer
Это сохраняет намерение компонентов в вашем дизайне и, на мой взгляд, намного проще объяснить и понять.
Редактировать : Ваш MDB может выглядеть примерно так, как показано ниже (на основе учебника по Java Enterprise Edition 6 ).Учебное пособие также содержит информацию об упаковке и развертывании MDB.
// MDB to consume messages on the original queue
@MessageDriven(mappedName="jms/IncomingQueue", activationConfig = {
@ActivationConfigProperty(propertyName = "acknowledgeMode",
propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "destinationType",
propertyValue = "javax.jms.Queue")
})
public class MyMDB implements MessageListener {
@EJB
private MessageFilter messageFilter;
public void onMessage(Message message) {
// pass on to MessageFilter bean for processing
messageFilter.filter(message);
}
}
// singleton bean to filter and sort messages, then re-publish to the original consumer
// if singleton doesn't work in your environment then you might have to persist the
// messages using e.g. JPA
@Singleton
public class MessageFilter {
@Resource(name="jms/OutgoingQueue")
Queue outgoingQueue;
@Resource(name="jms/QueueConnectionFactory")
QueueConnectionFactory qcf;
// accept incoming message from the MDB
void filter(Message message) {
// filter and sort messages
...
// send to queue read by the original consumer
send(message);
}
// send message to the filtered & sorted queue for the original consumer
void send(Message message) {
QueueConnection queueConnection = qcf.createQueueConnection();
QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender queueSender = queueSession(outgoingQueue);
queueSender.send(message);
}
}
Учебное руководство по Java EE 6 также содержит примеры , как создавать синглтон-компоненты , и вот учебное пособие для подключенияв очередь для отправки сообщения .