Как использовать синхронизацию, чтобы поддерживать MessageConsumer в JMS / ActiveMQ? - PullRequest
0 голосов
/ 25 апреля 2019

У меня есть один MessageProducer и несколько MessageConsumer в ActiveMQ. Я хочу, чтобы потребитель подождал, пока производитель что-нибудь опубликовал. Тогда потребители могут быть прекращены. Я пытаюсь использовать синхронизацию Java для достижения этой цели, но она не работает. Я вижу, что производитель что-то производит, но потребители не реагируют на сообщение.

Это мой код:

Класс производителя:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnection;

public class TopicProducer extends Thread {
    private final String producerMessage;
    private ActiveMQConnection connection;
    private Session session;
    private Topic topic;
    final private Object lock;

    public TopicProducer(String producerMessage, Session session, Topic topic,
                         final Object lock) {
        this.producerMessage = producerMessage;
        this.session = session;
        this.topic = topic;
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (this.lock) {
                Message msg = this.session.createTextMessage(this.producerMessage);
                MessageProducer producer = this.session.createProducer(this.topic);
                System.out.println("TopicProducer: sending text:" + ((TextMessage) msg).getText());
                producer.send(msg);
                System.out.println("after publish");
                this.lock.notifyAll();
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}

Потребительский класс

import javax.jms.*;

public class TopicConsumer extends Thread {
    private Session session;
    private Topic topic;
    private String consumerName;
    final private Object lock;

    public TopicConsumer(Session session, Topic topic, String consumerName,
                         final Object lock) {
        this.session = session;
        this.topic = topic;
        this.consumerName = consumerName;
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (this.lock) {
                MessageConsumer consumer = this.session.createConsumer(this.topic);
                consumer.setMessageListener(new ConsumerMessageListener(this.consumerName));
                this.lock.wait();
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}

Класс создателя соединения:

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.log4j.BasicConfigurator;

public class InitConnection {
    public static String QUEUE_NAME = "MyQueue";
    public static String ACTIVEMQ_INITIAL_CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
    public static String ACTIVEMQ_PROVIDER_URL = "tcp://localhost:61616";
    public static String CONN_FACTORY = "ConnectionFactory";
    public static String TOPIC = "someTopic";

    private ActiveMQConnection connection;
    private ActiveMQQueue queue;
    private Session session;
    private Topic topic;

    public InitConnection() {
        try {
            this.init();
        } catch (Exception e) {
            System.out.println(e);
        }
    }

    private void init() throws JMSException, NamingException {
        // Obtain a JNDI connection
        Properties props = new Properties();
        props.setProperty(Context.INITIAL_CONTEXT_FACTORY, ACTIVEMQ_INITIAL_CONTEXT_FACTORY);
        props.setProperty(Context.PROVIDER_URL, ACTIVEMQ_PROVIDER_URL);
        InitialContext jndiContext = new InitialContext(props);

        // Look up a JMS connection factory
        ActiveMQConnectionFactory conFactory = (ActiveMQConnectionFactory) jndiContext
                .lookup(CONN_FACTORY);

        // Getting JMS connection from the server and starting it
        this.connection = (ActiveMQConnection) conFactory.createConnection();
        this.connection.start();
        // JMS messages are sent and received using a Session. We will
        // create here a non-transactional session object. If you want
        // to use transactions you should set the first parameter to 'true'
        this.session = this.connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);

        this.queue = new ActiveMQQueue(QUEUE_NAME);
        this.topic = session.createTopic(TOPIC);
    }

    public ActiveMQConnection getConnection() {
        return connection;
    }

    public ActiveMQQueue getQueue() {
        return queue;
    }

    public Session getSession() {
        return session;
    }


    public Topic getTopic() {
        return topic;
    }

    private void joinThreads(Thread[] threads) {
        try {
            for (int i = 0; i < threads.length; i++) {
                threads[i].join();
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }

    public static void main(String[] args) {
        BasicConfigurator.configure(); //logs config
        InitConnection conn = new InitConnection();
        final Object lock = new Object();

        TopicProducer tp = new TopicProducer("producerMessage",
                conn.getSession(), conn.getTopic(), lock);
        TopicConsumer tc1 = new TopicConsumer(conn.getSession(),
                conn.getTopic(), "consumer1", lock);
        TopicConsumer tc2 = new TopicConsumer(conn.getSession(),
                conn.getTopic(), "consumer2", lock);
        TopicConsumer tc3 = new TopicConsumer(conn.getSession(),
                conn.getTopic(), "consumer3", lock);

        tc1.start();
        tc2.start();
        tc3.start();
        tp.start();

        try {
            conn.getConnection().close();
        } catch (Exception e) {
            System.out.println(e);
        }

    }
}

1 Ответ

1 голос
/ 25 апреля 2019

Не используйте для этого синхронизацию потоков - это совершенно неправильно.

Вы реализовали потребитель в качестве слушателя, этот является асинхронным. Вместо использования слушателя, используйте метод receive: https://activemq.apache.org/maven/apidocs/org/apache/activemq/ActiveMQMessageConsumer.html#receive--

Этот метод будет блокироваться до тех пор, пока вы не отправите сообщение, после чего он получит это сообщение и продолжит работу.

...