У меня есть один MessageProducer
и несколько MessageConsumer
в ActiveMQ. Я хочу, чтобы потребитель подождал, пока производитель что-нибудь опубликовал. Тогда потребители могут быть прекращены. Я пытаюсь использовать синхронизацию Java для достижения этой цели, но она не работает. Я вижу, что производитель что-то производит, но потребители не реагируют на сообщение.
Это мой код:
Класс производителя:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnection;
public class TopicProducer extends Thread {
private final String producerMessage;
private ActiveMQConnection connection;
private Session session;
private Topic topic;
final private Object lock;
public TopicProducer(String producerMessage, Session session, Topic topic,
final Object lock) {
this.producerMessage = producerMessage;
this.session = session;
this.topic = topic;
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (this.lock) {
Message msg = this.session.createTextMessage(this.producerMessage);
MessageProducer producer = this.session.createProducer(this.topic);
System.out.println("TopicProducer: sending text:" + ((TextMessage) msg).getText());
producer.send(msg);
System.out.println("after publish");
this.lock.notifyAll();
}
} catch (Exception e) {
System.out.println(e);
}
}
}
Потребительский класс
import javax.jms.*;
public class TopicConsumer extends Thread {
private Session session;
private Topic topic;
private String consumerName;
final private Object lock;
public TopicConsumer(Session session, Topic topic, String consumerName,
final Object lock) {
this.session = session;
this.topic = topic;
this.consumerName = consumerName;
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (this.lock) {
MessageConsumer consumer = this.session.createConsumer(this.topic);
consumer.setMessageListener(new ConsumerMessageListener(this.consumerName));
this.lock.wait();
}
} catch (Exception e) {
System.out.println(e);
}
}
}
Класс создателя соединения:
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.log4j.BasicConfigurator;
public class InitConnection {
public static String QUEUE_NAME = "MyQueue";
public static String ACTIVEMQ_INITIAL_CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
public static String ACTIVEMQ_PROVIDER_URL = "tcp://localhost:61616";
public static String CONN_FACTORY = "ConnectionFactory";
public static String TOPIC = "someTopic";
private ActiveMQConnection connection;
private ActiveMQQueue queue;
private Session session;
private Topic topic;
public InitConnection() {
try {
this.init();
} catch (Exception e) {
System.out.println(e);
}
}
private void init() throws JMSException, NamingException {
// Obtain a JNDI connection
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY, ACTIVEMQ_INITIAL_CONTEXT_FACTORY);
props.setProperty(Context.PROVIDER_URL, ACTIVEMQ_PROVIDER_URL);
InitialContext jndiContext = new InitialContext(props);
// Look up a JMS connection factory
ActiveMQConnectionFactory conFactory = (ActiveMQConnectionFactory) jndiContext
.lookup(CONN_FACTORY);
// Getting JMS connection from the server and starting it
this.connection = (ActiveMQConnection) conFactory.createConnection();
this.connection.start();
// JMS messages are sent and received using a Session. We will
// create here a non-transactional session object. If you want
// to use transactions you should set the first parameter to 'true'
this.session = this.connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
this.queue = new ActiveMQQueue(QUEUE_NAME);
this.topic = session.createTopic(TOPIC);
}
public ActiveMQConnection getConnection() {
return connection;
}
public ActiveMQQueue getQueue() {
return queue;
}
public Session getSession() {
return session;
}
public Topic getTopic() {
return topic;
}
private void joinThreads(Thread[] threads) {
try {
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
} catch (Exception e) {
System.out.println(e);
}
}
public static void main(String[] args) {
BasicConfigurator.configure(); //logs config
InitConnection conn = new InitConnection();
final Object lock = new Object();
TopicProducer tp = new TopicProducer("producerMessage",
conn.getSession(), conn.getTopic(), lock);
TopicConsumer tc1 = new TopicConsumer(conn.getSession(),
conn.getTopic(), "consumer1", lock);
TopicConsumer tc2 = new TopicConsumer(conn.getSession(),
conn.getTopic(), "consumer2", lock);
TopicConsumer tc3 = new TopicConsumer(conn.getSession(),
conn.getTopic(), "consumer3", lock);
tc1.start();
tc2.start();
tc3.start();
tp.start();
try {
conn.getConnection().close();
} catch (Exception e) {
System.out.println(e);
}
}
}