ACTIVEMQ - издатель подписчик привет мир пример - PullRequest
20 голосов
/ 26 января 2012

Существует две программы: подписчик и издатель ... Подписчик может поместить сообщение в тему, и сообщение успешно отправлено.Когда я проверяю сервер activemq в моем браузере, он показывает 1 msg в очереди.Но когда я запускаю потребительский код, он не получает сообщение

Вот код производителя:

import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class producer {

    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) throws JMSException {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // JMS messages are sent and received using a Session. We will
        // create here a non-transactional session object. If you want
        // to use transactions you should set the first parameter to 'true'
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("testt");

        MessageProducer producer = session.createProducer(topic);

        // We will send a small text message saying 'Hello'

        TextMessage message = session.createTextMessage();

        message.setText("HELLO JMS WORLD");
        // Here we are sending the message!
        producer.send(message);
        System.out.println("Sent message '" + message.getText() + "'");

        connection.close();
    }
}

После запуска этого кода на консоли выводится:

26 Jan, 2012 2:30:04 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect
INFO: Successfully connected to tcp://localhost:61616
Sent message 'HELLO JMS WORLD'

А вот код потребителя:

import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class consumer {
    // URL of the JMS server
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    // Name of the topic from which we will receive messages from = " testt"

    public static void main(String[] args) throws JMSException {
        // Getting JMS connection from the server

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("testt");

        MessageConsumer consumer = session.createConsumer(topic);

        MessageListener listner = new MessageListener() {
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("Received message"
                                + textMessage.getText() + "'");
                    }
                } catch (JMSException e) {
                    System.out.println("Caught:" + e);
                    e.printStackTrace();
                }
            }
        };

        consumer.setMessageListener(listner);
        connection.close();

    }
}    

После запуска этого кода он ничего не показывает.Может ли кто-нибудь помочь мне преодолеть эту проблему?

Ответы [ 4 ]

16 голосов
/ 26 января 2012

Ваша проблема в том, что ваш потребитель запускается, а затем немедленно выключается.

Попробуйте добавить это в ваш потребитель:

    consumer.setMessageListener(listner);

    try {
        System.in.read();
    } catch (IOException e) {
        e.printStackTrace();
    }

    connection.close();

Это будет ждать, пока вы не нажмете клавишу, прежде чем остановиться.

Другие вопросы для рассмотрения:

  • Используйте блок finally для закрытия
  • Соглашения об именах Java рекомендуют использовать заглавные буквы для первой буквы класса
12 голосов
/ 27 января 2012

Основная проблема (помимо быстрого закрытия приложения) заключается в том, что вы отправляете в раздел. Темы не сохраняют сообщения, поэтому, если вы запустите приложение, которое создает, а затем запустит потребителя, потребитель не получит ничего, потому что он не был подписан на тему во время отправки сообщения. Если вы решите проблему с отключением, а затем запустите потребителя в одном терминале, а затем запустите производителя, вы должны увидеть сообщение, полученное вашим потребителем. Если вы хотите сохранить сообщение, вам нужно использовать Очередь, которая будет удерживать сообщение до тех пор, пока кто-то его не использует.

3 голосов
/ 27 августа 2014

Ваш производитель класс правильный.Он работает гладко.

Но ваш потребитель неверен, и вы должны его изменить.

  • Сначала добавьте setClientID ("any_string_value ") после создания подключения объекта;

    например: Connection connection = connectionFactory.createConnection(); // need to setClientID value, any string value you wish connection.setClientID("12345");

  • во-вторых, используйте метод createDurableSubscriber () вместо createConsumer() для передачи сообщения через тему.

    MessageConsumer consumer = session.createDurableSubscriber(topic,"SUB1234");

Вот модифицированный потребительский класс:

package mq.test;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class consumer {
    // URL of the JMS server
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    // Name of the topic from which we will receive messages from = " testt"

    public static void main(String[] args) throws JMSException {
        // Getting JMS connection from the server

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();

        // need to setClientID value, any string value you wish
        connection.setClientID("12345");

        try{
        connection.start();
        }catch(Exception e){
            System.err.println("NOT CONNECTED!!!");
        }
        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("test_data");

        //need to use createDurableSubscriber() method instead of createConsumer() for topic
        // MessageConsumer consumer = session.createConsumer(topic);
        MessageConsumer consumer = session.createDurableSubscriber(topic,
                "SUB1234");

        MessageListener listner = new MessageListener() {
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("Received message"
                                + textMessage.getText() + "'");
                    }
                } catch (JMSException e) {
                    System.out.println("Caught:" + e);
                    e.printStackTrace();
                }
            }
        };

        consumer.setMessageListener(listner);
        //connection.close();

    }
}

Теперь ваш код будет успешно выполнен.

2 голосов
/ 28 января 2012

только некоторые:

  • работа с очередью, а не с темой. сообщения в темах будут отбрасываться, если ни один потребитель не доступен, они НЕ являются постоянными.
  • добавить connection.start () после настройки прослушивателя сообщений. Вы должны установить соединение, когда все потребители / производители настроены правильно.
  • подождите некоторое время, прежде чем снова закрывать соединение.

тема, вероятно, будет вашим самым важным источником неудачи.

...