JMS - переход от одного к нескольким потребителям - PullRequest
23 голосов
/ 06 января 2011

У меня есть клиент JMS, который создает сообщения и отправляет их через очередь JMS своему уникальному потребителю.

Я хочу, чтобы эти сообщения получали не один потребитель. Первое, что приходит мне в голову, - это преобразование очереди в тему, чтобы нынешние и новые потребители могли подписаться и получить одинаковое сообщение для всех них.

Это, очевидно, потребует изменения текущего клиентского кода как на стороне производителя, так и на стороне потребителя.

Я хотел бы также рассмотреть другие варианты, такие как создание второй очереди, чтобы мне не приходилось изменять существующего потребителя. Я полагаю, что в этом подходе есть преимущества, такие как (поправьте меня, если я ошибаюсь), распределение нагрузки между двумя разными очередями, а не одной, что может оказать положительное влияние на производительность.

Я хотел бы получить совет по поводу этих вариантов и минусов / плюсов, которые вы можете увидеть. Любая обратная связь высоко ценится.

Ответы [ 2 ]

47 голосов
/ 06 января 2011

У вас есть несколько вариантов, как вы заявили.

Если вы преобразуете его в тему, чтобы получить тот же эффект, вам нужно будет сделать потребителей постоянными потребителями. Одна вещь, которую предлагает очередь - это постоянство, если ваш потребитель не жив. Это будет зависеть от используемой вами системы MQ.

Если вы хотите придерживаться очередей, вы создадите очередь для каждого потребителя и диспетчера, который будет прослушивать исходную очередь.

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

Плюсы тем

  • Проще динамически добавлять новых потребителей. Все потребители будут получать новые сообщения без какой-либо работы.
  • Вы можете создавать циклические темы, чтобы Consumer_1 получал сообщение, затем Consumer_2, затем Consumer_3
  • Потребители могут получать новые сообщения, вместо того чтобы запрашивать очередь, что делает их реактивными.

Минусы тем

  • Сообщения не являются постоянными, если ваш брокер не поддерживает эту конфигурацию. Если потребитель отключается от сети и возвращается, возможно, пропущены сообщения, если постоянные потребители не настроены.
  • Трудно разрешить Consumer_1 и Consumer_2 получать сообщение, но не Consumer_3. С Диспетчером и Очередями Диспетчер не может поместить сообщение в очередь Consumer_3.

Плюсы очередей

  • Сообщения сохраняются до тех пор, пока Потребитель не удалит их
  • Диспетчер может фильтровать, какие потребители получают какие сообщения, не помещая сообщения в очереди соответствующих потребителей. Это можно сделать с помощью тем через фильтры.

Минусы очередей

  • Необходимо создать дополнительные очереди для поддержки нескольких потребителей. В динамичной среде это не будет эффективно.

При разработке системы обмена сообщениями я предпочитаю темы, поскольку они дают мне больше возможностей, но, поскольку вы уже используете очереди, вам потребуется изменить работу вашей системы, чтобы вместо этого реализовывать темы.

Проектирование и внедрение системы очередей с несколькими потребителями

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3

Источник

Имейте в виду, что есть другие вещи, о которых вам нужно позаботиться, такие как обработка исключений проблем, переподключение к соединению и очереди в случае потери соединения и т. Д. Это просто разработано, чтобы дать вам представление о том, как выполнить то, что я описал.

В реальной системе я, вероятно, не выйду из системы при первом исключении. Я бы позволил системе продолжать работать как можно лучше и регистрировать ошибки. В этом коде, если поместить сообщение в одну очередь потребителей не удастся, весь диспетчер остановится.

Dispatcher.java

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package stackoverflow_4615895;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

public class Dispatcher {

    private static long QUEUE_WAIT_TIME = 1000;
    private boolean mStop = false;
    private QueueConnectionFactory mFactory;
    private String mSourceQueueName;
    private String[] mConsumerQueueNames;

    /**
     * Create a dispatcher
     * @param factory
     *      The QueueConnectionFactory in which new connections, session, and consumers
     *      will be created. This is needed to ensure the connection is associated
     *      with the correct thread.
     * @param source
     *
     * @param consumerQueues
     */
    public Dispatcher(
        QueueConnectionFactory factory, 
        String sourceQueue, 
        String[] consumerQueues) {

        mFactory = factory;
        mSourceQueueName = sourceQueue;
        mConsumerQueueNames = consumerQueues;
    }

    public void start() {
        Thread thread = new Thread(new Runnable() {

            public void run() {
                Dispatcher.this.run();
            }
        });
        thread.setName("Queue Dispatcher");
        thread.start();
    }

    public void stop() {
        mStop = true;
    }

    private void run() {

        QueueConnection connection = null;
        MessageProducer producer = null;
        MessageConsumer consumer = null;
        QueueSession session = null;
        try {
            // Setup connection and queues for receiving the messages
            connection = mFactory.createQueueConnection();
            session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            Queue sourceQueue = session.createQueue(mSourceQueueName);
            consumer = session.createConsumer(sourceQueue);

            // Create a null producer allowing us to send messages
            // to any queue.
            producer = session.createProducer(null);

            // Create the destination queues based on the consumer names we
            // were given.
            Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
            for (int index = 0; index < mConsumerQueueNames.length; ++index) {
                destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
            }

            connection.start();

            while (!mStop) {

                // Only wait QUEUE_WAIT_TIME in order to give
                // the dispatcher a chance to see if it should
                // quit
                Message m = consumer.receive(QUEUE_WAIT_TIME);
                if (m == null) {
                    continue;
                }

                // Take the message we received and put
                // it in each of the consumers destination
                // queues for them to process
                for (Queue q : destinationQueues) {
                    producer.send(q, m);
                }
            }

        } catch (JMSException ex) {
            // Do wonderful things here 
        } finally {
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException ex) {
                }
            }
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException ex) {
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException ex) {
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException ex) {
                }
            }
        }
    }
}

Main.java

    QueueConnectionFactory factory = ...;

    Dispatcher dispatcher =
            new Dispatcher(
            factory,
            "Queue_Original",
            new String[]{
                "Consumer_Queue_1",
                "Consumer_Queue_2",
                "Consumer_Queue_3"});
    dispatcher.start();
4 голосов
/ 06 января 2011

Возможно, вам не придется изменять код;это зависит от того, как вы его написали.

Например, если ваш код отправляет сообщения, используя MessageProducer вместо QueueSender, он будет работать как для тем, так и для очередей.Аналогично, если вы использовали MessageConsumer вместо QueueReceiver.

По сути, в приложениях JMS рекомендуется использовать неспецифические интерфейсы для взаимодействия с системой JMS, такие как MessageProducer, MessageConsumer, Destination и т. Д. Если это так, то это "простой" вопрос конфигурации.

...