У меня есть 2 экземпляра моего приложения на одной машине (хотя это может быть и на разных машинах) с двумя экземплярами Tomcat с разными портами, и Apache ActiveMQ встроен в приложение.
Я настроилстатическая сеть посредников, так что сообщение из одного экземпляра может быть использовано и всеми другими экземплярами (каждый экземпляр может быть производителем и потребителем).
servlet:
package com.activemq.servlet;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import javax.jms.JMSException;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.activemq.ActiveMQStartup;
import com.activemq.MQPublisher;
import com.activemq.SendMsg;
import com.activemq.SendMsgToAllInstance;
import com.activemq.TestPublisher;
/**
* Servlet implementation class ActiveMQStartUpServlet
*/
@WebServlet(value = "/activeMQStartUpServlet", loadOnStartup = 1)
public class ActiveMQStartUpServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private ActiveMQStartup mqStartup = null;
private static final Map pooledPublishers = new HashMap();
@Override
public void init(ServletConfig config) throws ServletException {
System.out.println("starting servelt--------------");
super.init(config);
//Apache Active MQ Startup
mqStartup = new ActiveMQStartup();
mqStartup.startBrokerService();
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
System.out.println(req.getParameter("distributedMsg"));
String mqConfig = null;
String distributedMsg = req.getParameter("distributedMsg");
String simpleMsg = req.getParameter("simpleMsg");
if (distributedMsg != null && !distributedMsg.equals(""))
mqConfig = "distributedMsg";
else if (simpleMsg != null && !simpleMsg.equals(""))
mqConfig = "simpleMsg";
MQPublisher publisher = acquirePublisher(mqConfig);
try {
publisher.publish(mqConfig);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
releasePublisher(publisher);
}
}
@SuppressWarnings("unchecked")
private void releasePublisher(MQPublisher publisher) {
if (publisher == null) return;
@SuppressWarnings("rawtypes")
LinkedList publishers;
TestPublisher poolablePublisher = (TestPublisher)publisher;
publishers = getPooledPublishers(poolablePublisher.getConfigurationName());
synchronized (publishers) {
publishers.addLast(poolablePublisher);
}
}
private MQPublisher acquirePublisher(String mqConfig) {
LinkedList publishers = getPooledPublishers(mqConfig);
MQPublisher publisher = getMQPubliser(publishers);
if (publisher != null) return publisher;
try {
if (mqConfig.equals("distributedMsg"))
return new TestPublisher(MQConfiguration.getConfiguration("distributedMsg"), new SendMsgToAllInstance());
else
return new TestPublisher(MQConfiguration.getConfiguration("simpleMsg"), new SendMsg());
}catch(Exception e){
e.printStackTrace();
}
return null;
}
private LinkedList getPooledPublishers(String mqConfig) {
LinkedList publishers = null;
publishers = (LinkedList) pooledPublishers.get(mqConfig);
if (publishers == null) {
synchronized(pooledPublishers) {
publishers = (LinkedList) pooledPublishers.get(mqConfig);
if (publishers == null) {
publishers = new LinkedList();
pooledPublishers.put(mqConfig, publishers);
}
}
}
return publishers;
}
private MQPublisher getMQPubliser(LinkedList publishers) {
synchronized (publishers) {
while (!publishers.isEmpty()) {
TestPublisher publisher = (TestPublisher)publishers.removeFirst();
return publisher;
}
}
return null;
}
}
Конфигурация:
package com.activemq.servlet;
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSException;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.activemq.ActiveMQContext;
public class MQConfiguration {
private static final Map configurations = new HashMap();
private String mqConfig;
private String topicName;
private TopicConnection topicConnection = null;
private MQConfiguration(String mqConfig, String string, String string2) {
this.mqConfig = mqConfig;
try {
String topicFactoryConName = ActiveMQContext.getProperty(mqConfig);
this.topicName = (mqConfig.equals("distributedMsg") ? ActiveMQContext.getProperty("distributedTopic"):ActiveMQContext.getProperty("normalTopic"));
TopicConnectionFactory factory = (ActiveMQConnectionFactory) ActiveMQContext.getContext()
.lookup(topicFactoryConName);
this.topicConnection = factory.createTopicConnection();
this.topicConnection.start();
} catch (Exception e) {
System.out.println("error: " + e);
}
}
public static MQConfiguration getConfiguration(String mqConfig) {
if (mqConfig == null || "".equals(mqConfig)) {
throw new IllegalArgumentException("mqConfig is null or empty");
}
MQConfiguration config = null;
if (config != null) {
return config;
}
synchronized (configurations) {
config = (MQConfiguration) configurations.get(mqConfig);
if (config == null) {
config = new MQConfiguration(mqConfig, "userName", "userPassword");
}
configurations.put(mqConfig, config);
}
return config;
}
public String getMqConfig() {
return this.mqConfig;
}
public TopicSession createTopicSession(boolean isTransacted, int autoAcknowledge) throws JMSException {
if (this.topicConnection == null) {
IllegalStateException ise = new IllegalStateException("topic connection not configured");
throw ise;
}
return this.topicConnection.createTopicSession(isTransacted, autoAcknowledge);
}
public Topic getTopic() {
try {
return (Topic) ActiveMQContext.getContext().lookup(this.topicName);
} catch (Exception e) {
e.getMessage();
}
return null;
}
}
издатель:
package com.activemq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import com.activemq.servlet.MQConfiguration;
public class TestPublisher implements MQPublisher {
private final String configurationName;
private TopicSession topicSession = null;
private TopicPublisher topicPublisher = null;
public TestPublisher(MQConfiguration config, Object messageListener) throws JMSException {
if (config == null) {
throw new IllegalArgumentException("config == null");
}
Topic topic = config.getTopic();
this.configurationName = config.getMqConfig();
this.topicSession = config.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
this.topicPublisher = this.topicSession.createPublisher(topic);
MessageConsumer msgConsumer = this.topicSession.createConsumer(topic);
msgConsumer.setMessageListener((MessageListener) messageListener);
}
@Override
public void publish(String msg) throws JMSException {
this.topicPublisher.publish(createMessage(msg, this.topicSession));
}
private Message createMessage(String msg, Session session) throws JMSException {
TextMessage message = session.createTextMessage(msg);
return message;
}
public String getConfigurationName() {
return this.configurationName;
}
}
Потребитель:
package com.activemq;
import javax.jms.Message;
import javax.jms.MessageListener;
public class SendMsgToAllInstance implements MessageListener {
@Override
public void onMessage(Message arg0) {
System.out.println("distributed message-------------");
// We have call to dao layer to to fetch some data and cached it
}
}
JNDI: activemq-jndi.properties
# JNDI properties file to setup the JNDI server within ActiveMQ
#
# Default JNDI properties settings
#
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
activemq.network.connector=static:(tcp://localhost:61620)
#activemq.network.connector=broker:(tcp://localhost:61619,network:static:tcp://localhost:61620)?persistent=false&useJmx=true
activemq.data.directory=data61619
activemq.jmx.port=1099
#
# Set the connection factory name(s) as well as the destination names. The connection factory name(s)
# as well as the second part (after the dot) of the left hand side of the destination definition
# must be used in the JNDI lookups.
#
connectionFactoryNames = distributedMsgFactory,simpleMsgFactory
topic.jms/distributedTopic=distributedTopic
topic.jms/normalTopic=normalTopic
distributedMsg=distributedMsgFactory
simpleMsg=simpleMsgFactory
distributedTopic=jms/distributedTopic
normalTopic=jms/normalTopic
ActiveMQStartup :
package com.activemq;
import java.net.URI;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.security.JaasAuthenticationPlugin;
public class ActiveMQStartup {
private final String bindAddress;
private final String dataDirectory;
private BrokerService broker = new BrokerService();
protected final int numRestarts = 3;
protected final int networkTTL = 2;
protected final int consumerTTL = 2;
protected final boolean dynamicOnly = true;
protected final String networkBroker;
protected final String jmxPort;
public ActiveMQStartup() {
ActiveMQContext context = new ActiveMQContext();
context.loadJndiProperties();
bindAddress = ActiveMQContext.getProperty("java.naming.provider.url");
dataDirectory = ActiveMQContext.getProperty("activemq.data.directory");
networkBroker = ActiveMQContext.getProperty("activemq.network.connector");
jmxPort = ActiveMQContext.getProperty("activemq.jmx.port");
}
// Start activemq broker service
public void startBrokerService() {
try {
broker.setDataDirectory("../" + dataDirectory);
broker.setBrokerName(dataDirectory);
broker.setUseShutdownHook(true);
TransportConnector connector = new TransportConnector();
connector.setUri(new URI(bindAddress));
//broker.setPlugins(new BrokerPlugin[]{new JaasAuthenticationPlugin()});
ManagementContext mgContext = new ManagementContext();
if (networkBroker != null && !networkBroker.isEmpty()) {
NetworkConnector networkConnector = broker.addNetworkConnector(networkBroker);
networkConnector.setName(dataDirectory);
mgContext.setConnectorPort(Integer.parseInt(jmxPort));
broker.setManagementContext(mgContext);
configureNetworkConnector(networkConnector);
}
broker.setNetworkConnectorStartAsync(true);
broker.addConnector(connector);
broker.start();
} catch (Exception e) {
System.out.println("Failed to start Apache MQ Broker : " + e);
}
}
private void configureNetworkConnector(NetworkConnector networkConnector) {
networkConnector.setDuplex(true);
networkConnector.setNetworkTTL(networkTTL);
networkConnector.setDynamicOnly(dynamicOnly);
networkConnector.setConsumerTTL(consumerTTL);
//networkConnector.setStaticBridge(true);
}
// Stop broker service
public void stopBrokerService() {
try {
broker.stop();
} catch (Exception e) {
System.out.println("Unable to stop the ApacheMQ Broker service " + e);
}
}
}
Я запускаю экземпляр tomcat один за другим и вижу, что сетевое соединение между брокером устанавливается.
Когда я отправляю сообщения из instance1 или instance2(первый раз) он потребляет только этот экземпляр, но когда я отправляю сообщение из второго экземпляра, он используется обоими;
Код в git: https://github.com/AratRana/ApacheActiveMQ
МожетВы указываете мне, где я не прав?