У вас есть несколько вариантов, как вы заявили.
Если вы преобразуете его в тему, чтобы получить тот же эффект, вам нужно будет сделать потребителей постоянными потребителями. Одна вещь, которую предлагает очередь - это постоянство, если ваш потребитель не жив. Это будет зависеть от используемой вами системы MQ.
Если вы хотите придерживаться очередей, вы создадите очередь для каждого потребителя и диспетчера, который будет прослушивать исходную очередь.
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
-> Queue_Consumer_2 <- Consumer_2
-> Queue_Consumer_3 <- Consumer_3
Плюсы тем
- Проще динамически добавлять новых потребителей. Все потребители будут получать новые сообщения без какой-либо работы.
- Вы можете создавать циклические темы, чтобы Consumer_1 получал сообщение, затем Consumer_2, затем Consumer_3
- Потребители могут получать новые сообщения, вместо того чтобы запрашивать очередь, что делает их реактивными.
Минусы тем
- Сообщения не являются постоянными, если ваш брокер не поддерживает эту конфигурацию. Если потребитель отключается от сети и возвращается, возможно, пропущены сообщения, если постоянные потребители не настроены.
- Трудно разрешить Consumer_1 и Consumer_2 получать сообщение, но не Consumer_3. С Диспетчером и Очередями Диспетчер не может поместить сообщение в очередь Consumer_3.
Плюсы очередей
- Сообщения сохраняются до тех пор, пока Потребитель не удалит их
- Диспетчер может фильтровать, какие потребители получают какие сообщения, не помещая сообщения в очереди соответствующих потребителей. Это можно сделать с помощью тем через фильтры.
Минусы очередей
- Необходимо создать дополнительные очереди для поддержки нескольких потребителей. В динамичной среде это не будет эффективно.
При разработке системы обмена сообщениями я предпочитаю темы, поскольку они дают мне больше возможностей, но, поскольку вы уже используете очереди, вам потребуется изменить работу вашей системы, чтобы вместо этого реализовывать темы.
Проектирование и внедрение системы очередей с несколькими потребителями
Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
-> Queue_Consumer_2 <- Consumer_2
-> Queue_Consumer_3 <- Consumer_3
Источник
Имейте в виду, что есть другие вещи, о которых вам нужно позаботиться, такие как обработка исключений проблем, переподключение к соединению и очереди в случае потери соединения и т. Д. Это просто разработано, чтобы дать вам представление о том, как выполнить то, что я описал.
В реальной системе я, вероятно, не выйду из системы при первом исключении. Я бы позволил системе продолжать работать как можно лучше и регистрировать ошибки. В этом коде, если поместить сообщение в одну очередь потребителей не удастся, весь диспетчер остановится.
Dispatcher.java
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package stackoverflow_4615895;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
public class Dispatcher {
private static long QUEUE_WAIT_TIME = 1000;
private boolean mStop = false;
private QueueConnectionFactory mFactory;
private String mSourceQueueName;
private String[] mConsumerQueueNames;
/**
* Create a dispatcher
* @param factory
* The QueueConnectionFactory in which new connections, session, and consumers
* will be created. This is needed to ensure the connection is associated
* with the correct thread.
* @param source
*
* @param consumerQueues
*/
public Dispatcher(
QueueConnectionFactory factory,
String sourceQueue,
String[] consumerQueues) {
mFactory = factory;
mSourceQueueName = sourceQueue;
mConsumerQueueNames = consumerQueues;
}
public void start() {
Thread thread = new Thread(new Runnable() {
public void run() {
Dispatcher.this.run();
}
});
thread.setName("Queue Dispatcher");
thread.start();
}
public void stop() {
mStop = true;
}
private void run() {
QueueConnection connection = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
QueueSession session = null;
try {
// Setup connection and queues for receiving the messages
connection = mFactory.createQueueConnection();
session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue sourceQueue = session.createQueue(mSourceQueueName);
consumer = session.createConsumer(sourceQueue);
// Create a null producer allowing us to send messages
// to any queue.
producer = session.createProducer(null);
// Create the destination queues based on the consumer names we
// were given.
Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
for (int index = 0; index < mConsumerQueueNames.length; ++index) {
destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
}
connection.start();
while (!mStop) {
// Only wait QUEUE_WAIT_TIME in order to give
// the dispatcher a chance to see if it should
// quit
Message m = consumer.receive(QUEUE_WAIT_TIME);
if (m == null) {
continue;
}
// Take the message we received and put
// it in each of the consumers destination
// queues for them to process
for (Queue q : destinationQueues) {
producer.send(q, m);
}
}
} catch (JMSException ex) {
// Do wonderful things here
} finally {
if (producer != null) {
try {
producer.close();
} catch (JMSException ex) {
}
}
if (consumer != null) {
try {
consumer.close();
} catch (JMSException ex) {
}
}
if (session != null) {
try {
session.close();
} catch (JMSException ex) {
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
}
}
}
}
}
Main.java
QueueConnectionFactory factory = ...;
Dispatcher dispatcher =
new Dispatcher(
factory,
"Queue_Original",
new String[]{
"Consumer_Queue_1",
"Consumer_Queue_2",
"Consumer_Queue_3"});
dispatcher.start();