Сообщение не используется всеми потребителями, когда сетевые брокеры настроены в ActiveMQ - PullRequest
0 голосов
/ 23 октября 2018

У меня есть 2 экземпляра моего приложения на одной машине (хотя это может быть и на разных машинах) с двумя экземплярами Tomcat с разными портами, и Apache ActiveMQ встроен в приложение.

Я настроилстатическая сеть посредников, так что сообщение из одного экземпляра может быть использовано и всеми другими экземплярами (каждый экземпляр может быть производителем и потребителем).


package com.activemq.servlet;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;

import javax.jms.JMSException;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.activemq.ActiveMQStartup;
import com.activemq.MQPublisher;
import com.activemq.SendMsg;
import com.activemq.SendMsgToAllInstance;
import com.activemq.TestPublisher;

 * Servlet implementation class ActiveMQStartUpServlet
@WebServlet(value = "/activeMQStartUpServlet", loadOnStartup = 1)
public class ActiveMQStartUpServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;
    private ActiveMQStartup mqStartup = null;
    private static final Map pooledPublishers = new HashMap();

    public void init(ServletConfig config) throws ServletException {
        System.out.println("starting servelt--------------");
        //Apache Active MQ Startup
        mqStartup = new ActiveMQStartup();


    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        String mqConfig = null;
        String distributedMsg = req.getParameter("distributedMsg");
        String simpleMsg = req.getParameter("simpleMsg");
        if (distributedMsg != null && !distributedMsg.equals(""))
            mqConfig = "distributedMsg";
        else if (simpleMsg != null && !simpleMsg.equals(""))
            mqConfig = "simpleMsg";
        MQPublisher publisher = acquirePublisher(mqConfig);
        try {
        } catch (JMSException e) {
            // TODO Auto-generated catch block
        } finally {

    private void releasePublisher(MQPublisher publisher) {
        if (publisher == null) return;
        LinkedList publishers;
        TestPublisher poolablePublisher = (TestPublisher)publisher;
        publishers = getPooledPublishers(poolablePublisher.getConfigurationName());
        synchronized (publishers) {


    private MQPublisher acquirePublisher(String mqConfig) {
        LinkedList publishers = getPooledPublishers(mqConfig);
        MQPublisher publisher = getMQPubliser(publishers);
        if (publisher != null) return publisher;
        try {
            if (mqConfig.equals("distributedMsg"))
                return new TestPublisher(MQConfiguration.getConfiguration("distributedMsg"), new SendMsgToAllInstance());
                return new TestPublisher(MQConfiguration.getConfiguration("simpleMsg"), new SendMsg());
        }catch(Exception e){
        return null;

    private LinkedList getPooledPublishers(String mqConfig) {
         LinkedList publishers = null;
         publishers = (LinkedList) pooledPublishers.get(mqConfig);
         if (publishers == null) {
             synchronized(pooledPublishers) {
                 publishers = (LinkedList) pooledPublishers.get(mqConfig);
                 if (publishers == null) {
                     publishers = new LinkedList();
                     pooledPublishers.put(mqConfig, publishers);
        return publishers;

    private MQPublisher getMQPubliser(LinkedList publishers) {
        synchronized (publishers) {
            while (!publishers.isEmpty()) {
                TestPublisher publisher = (TestPublisher)publishers.removeFirst();
                return publisher;
        return null;



package com.activemq.servlet;

import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.activemq.ActiveMQContext;

public class MQConfiguration {
    private static final Map configurations = new HashMap();
    private String mqConfig;
    private String topicName;
    private TopicConnection topicConnection = null;

    private MQConfiguration(String mqConfig, String string, String string2) {
        this.mqConfig = mqConfig;

        try {
            String topicFactoryConName = ActiveMQContext.getProperty(mqConfig);
            this.topicName = (mqConfig.equals("distributedMsg") ? ActiveMQContext.getProperty("distributedTopic"):ActiveMQContext.getProperty("normalTopic"));
            TopicConnectionFactory factory = (ActiveMQConnectionFactory) ActiveMQContext.getContext()
            this.topicConnection = factory.createTopicConnection();
        } catch (Exception e) {
            System.out.println("error: " + e);

    public static MQConfiguration getConfiguration(String mqConfig) {
        if (mqConfig == null || "".equals(mqConfig)) {
            throw new IllegalArgumentException("mqConfig is null or empty");

        MQConfiguration config = null;

        if (config != null) {
            return config;
        synchronized (configurations) {
            config = (MQConfiguration) configurations.get(mqConfig);
            if (config == null) {
                config = new MQConfiguration(mqConfig, "userName", "userPassword");
            configurations.put(mqConfig, config);

        return config;

    public String getMqConfig() {
        return this.mqConfig;

    public TopicSession createTopicSession(boolean isTransacted, int autoAcknowledge) throws JMSException {
        if (this.topicConnection == null) {
            IllegalStateException ise = new IllegalStateException("topic connection not configured");
            throw ise;
        return this.topicConnection.createTopicSession(isTransacted, autoAcknowledge);

    public Topic getTopic() {
        try {
            return (Topic) ActiveMQContext.getContext().lookup(this.topicName);
        } catch (Exception e) {
        return null;


package com.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import com.activemq.servlet.MQConfiguration;

public class TestPublisher implements MQPublisher {
    private final String configurationName;
    private TopicSession topicSession = null;
    private TopicPublisher topicPublisher = null;

    public TestPublisher(MQConfiguration config, Object messageListener) throws JMSException {
        if (config == null) {
            throw new IllegalArgumentException("config == null");
        Topic topic = config.getTopic();
        this.configurationName = config.getMqConfig();
        this.topicSession = config.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        this.topicPublisher = this.topicSession.createPublisher(topic);
        MessageConsumer msgConsumer = this.topicSession.createConsumer(topic);
        msgConsumer.setMessageListener((MessageListener) messageListener);

    public void publish(String msg) throws JMSException {
        this.topicPublisher.publish(createMessage(msg, this.topicSession));

    private Message createMessage(String msg, Session session) throws JMSException {
        TextMessage message = session.createTextMessage(msg);
        return message;

    public String getConfigurationName() {
        return this.configurationName;


package com.activemq;

import javax.jms.Message;
import javax.jms.MessageListener;

public class SendMsgToAllInstance implements MessageListener {

    public void onMessage(Message arg0) {
        System.out.println("distributed message-------------");

        // We have call to dao layer to to fetch some data and cached it



JNDI: activemq-jndi.properties

# JNDI properties file to setup the JNDI server within ActiveMQ

# Default JNDI properties settings


# Set the connection factory name(s) as well as the destination names. The connection factory name(s)
# as well as the second part (after the dot) of the left hand side of the destination definition
# must be used in the JNDI lookups.
connectionFactoryNames = distributedMsgFactory,simpleMsgFactory



ActiveMQStartup :

package com.activemq;

import java.net.URI;

import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.security.JaasAuthenticationPlugin;

public class ActiveMQStartup {
    private final String bindAddress;
    private final String dataDirectory;
    private BrokerService broker = new BrokerService();
    protected final int numRestarts = 3;
    protected final int networkTTL = 2;
    protected final int consumerTTL = 2;
    protected final boolean dynamicOnly = true;
    protected final String networkBroker;
    protected final String jmxPort;

    public ActiveMQStartup() {
        ActiveMQContext context = new ActiveMQContext();
        bindAddress = ActiveMQContext.getProperty("java.naming.provider.url");
        dataDirectory = ActiveMQContext.getProperty("activemq.data.directory");
        networkBroker = ActiveMQContext.getProperty("activemq.network.connector");
        jmxPort = ActiveMQContext.getProperty("activemq.jmx.port");

    // Start activemq broker service
    public void startBrokerService() {
        try {
            broker.setDataDirectory("../" + dataDirectory);
            TransportConnector connector = new TransportConnector();
            connector.setUri(new URI(bindAddress));         

            //broker.setPlugins(new BrokerPlugin[]{new JaasAuthenticationPlugin()});
            ManagementContext mgContext = new ManagementContext();
            if (networkBroker != null && !networkBroker.isEmpty()) {
                NetworkConnector networkConnector = broker.addNetworkConnector(networkBroker);
        } catch (Exception e) {
            System.out.println("Failed to start Apache MQ Broker : " + e);

    private void configureNetworkConnector(NetworkConnector networkConnector) {

    // Stop broker service
    public void stopBrokerService() {
        try {
        } catch (Exception e) {
            System.out.println("Unable to stop the ApacheMQ Broker service " + e);

Я запускаю экземпляр tomcat один за другим и вижу, что сетевое соединение между брокером устанавливается.

Когда я отправляю сообщения из instance1 или instance2(первый раз) он потребляет только этот экземпляр, но когда я отправляю сообщение из второго экземпляра, он используется обоими;

Код в git: https://github.com/AratRana/ApacheActiveMQ

МожетВы указываете мне, где я не прав?

1 Ответ

0 голосов
/ 01 ноября 2018

Наконец я могу это сделать.Когда я запускал получателя во время запуска сервера, я мог видеть получателя сообщений во всех случаях.Таким образом, для достижения этой цели, потребители должны быть запущены перед публикацией любого сообщения.
