Сообщение не используется всеми потребителями, когда сетевые брокеры настроены в ActiveMQ - PullRequest
0 голосов
/ 23 октября 2018

У меня есть 2 экземпляра моего приложения на одной машине (хотя это может быть и на разных машинах) с двумя экземплярами Tomcat с разными портами, и Apache ActiveMQ встроен в приложение.

Я настроилстатическая сеть посредников, так что сообщение из одного экземпляра может быть использовано и всеми другими экземплярами (каждый экземпляр может быть производителем и потребителем).

servlet:

package com.activemq.servlet;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;

import javax.jms.JMSException;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.activemq.ActiveMQStartup;
import com.activemq.MQPublisher;
import com.activemq.SendMsg;
import com.activemq.SendMsgToAllInstance;
import com.activemq.TestPublisher;

/**
 * Servlet implementation class ActiveMQStartUpServlet
 */
@WebServlet(value = "/activeMQStartUpServlet", loadOnStartup = 1)
public class ActiveMQStartUpServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;
    private ActiveMQStartup mqStartup = null;
    private static final Map pooledPublishers = new HashMap();

    @Override
    public void init(ServletConfig config) throws ServletException {
        System.out.println("starting servelt--------------");
        super.init(config);
        //Apache Active MQ Startup
        mqStartup = new ActiveMQStartup();
        mqStartup.startBrokerService();

    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        System.out.println(req.getParameter("distributedMsg"));
        String mqConfig = null;
        String distributedMsg = req.getParameter("distributedMsg");
        String simpleMsg = req.getParameter("simpleMsg");
        if (distributedMsg != null && !distributedMsg.equals(""))
            mqConfig = "distributedMsg";
        else if (simpleMsg != null && !simpleMsg.equals(""))
            mqConfig = "simpleMsg";
        MQPublisher publisher = acquirePublisher(mqConfig);
        try {
            publisher.publish(mqConfig);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            releasePublisher(publisher);
        }
    }

    @SuppressWarnings("unchecked")
    private void releasePublisher(MQPublisher publisher) {
        if (publisher == null) return;
        @SuppressWarnings("rawtypes")
        LinkedList publishers;
        TestPublisher poolablePublisher = (TestPublisher)publisher;
        publishers = getPooledPublishers(poolablePublisher.getConfigurationName());
        synchronized (publishers) {
            publishers.addLast(poolablePublisher);
        }

    }

    private MQPublisher acquirePublisher(String mqConfig) {
        LinkedList publishers = getPooledPublishers(mqConfig);
        MQPublisher publisher = getMQPubliser(publishers);
        if (publisher != null) return publisher;
        try {
            if (mqConfig.equals("distributedMsg"))
                return new TestPublisher(MQConfiguration.getConfiguration("distributedMsg"), new SendMsgToAllInstance());
            else    
                return new TestPublisher(MQConfiguration.getConfiguration("simpleMsg"), new SendMsg());
        }catch(Exception e){
            e.printStackTrace();
        }
        return null;
    }

    private LinkedList getPooledPublishers(String mqConfig) {
         LinkedList publishers = null;
         publishers = (LinkedList) pooledPublishers.get(mqConfig);
         if (publishers == null) {
             synchronized(pooledPublishers) {
                 publishers = (LinkedList) pooledPublishers.get(mqConfig);
                 if (publishers == null) {
                     publishers = new LinkedList();
                     pooledPublishers.put(mqConfig, publishers);
                 }
             }
         }
        return publishers;
    }

    private MQPublisher getMQPubliser(LinkedList publishers) {
        synchronized (publishers) {
            while (!publishers.isEmpty()) {
                TestPublisher publisher = (TestPublisher)publishers.removeFirst();
                return publisher;
            }
        }
        return null;
    }




}

Конфигурация:

package com.activemq.servlet;

import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.activemq.ActiveMQContext;

public class MQConfiguration {
    private static final Map configurations = new HashMap();
    private String mqConfig;
    private String topicName;
    private TopicConnection topicConnection = null;

    private MQConfiguration(String mqConfig, String string, String string2) {
        this.mqConfig = mqConfig;

        try {
            String topicFactoryConName = ActiveMQContext.getProperty(mqConfig);
            this.topicName = (mqConfig.equals("distributedMsg") ? ActiveMQContext.getProperty("distributedTopic"):ActiveMQContext.getProperty("normalTopic"));
            TopicConnectionFactory factory = (ActiveMQConnectionFactory) ActiveMQContext.getContext()
                    .lookup(topicFactoryConName);
            this.topicConnection = factory.createTopicConnection();
            this.topicConnection.start();
        } catch (Exception e) {
            System.out.println("error: " + e);
        }
    }

    public static MQConfiguration getConfiguration(String mqConfig) {
        if (mqConfig == null || "".equals(mqConfig)) {
            throw new IllegalArgumentException("mqConfig is null or empty");
        }

        MQConfiguration config = null;

        if (config != null) {
            return config;
        }
        synchronized (configurations) {
            config = (MQConfiguration) configurations.get(mqConfig);
            if (config == null) {
                config = new MQConfiguration(mqConfig, "userName", "userPassword");
            }
            configurations.put(mqConfig, config);
        }

        return config;
    }

    public String getMqConfig() {
        return this.mqConfig;
    }

    public TopicSession createTopicSession(boolean isTransacted, int autoAcknowledge) throws JMSException {
        if (this.topicConnection == null) {
            IllegalStateException ise = new IllegalStateException("topic connection not configured");
            throw ise;
        }
        return this.topicConnection.createTopicSession(isTransacted, autoAcknowledge);
    }

    public Topic getTopic() {
        try {
            return (Topic) ActiveMQContext.getContext().lookup(this.topicName);
        } catch (Exception e) {
            e.getMessage();
        }
        return null;
    }
}

издатель:

package com.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import com.activemq.servlet.MQConfiguration;

public class TestPublisher implements MQPublisher {
    private final String configurationName;
    private TopicSession topicSession = null;
    private TopicPublisher topicPublisher = null;

    public TestPublisher(MQConfiguration config, Object messageListener) throws JMSException {
        if (config == null) {
            throw new IllegalArgumentException("config == null");
        }
        Topic topic = config.getTopic();
        this.configurationName = config.getMqConfig();
        this.topicSession = config.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        this.topicPublisher = this.topicSession.createPublisher(topic);
        MessageConsumer msgConsumer = this.topicSession.createConsumer(topic);
        msgConsumer.setMessageListener((MessageListener) messageListener);
    }

    @Override
    public void publish(String msg) throws JMSException {
        this.topicPublisher.publish(createMessage(msg, this.topicSession));
    }

    private Message createMessage(String msg, Session session) throws JMSException {
        TextMessage message = session.createTextMessage(msg);
        return message;
    }

    public String getConfigurationName() {
        return this.configurationName;
    }
}

Потребитель:

package com.activemq;

import javax.jms.Message;
import javax.jms.MessageListener;

public class SendMsgToAllInstance implements MessageListener {

    @Override
    public void onMessage(Message arg0) {
        System.out.println("distributed message-------------");

        // We have call to dao layer to to fetch some data and cached it

    }

}

JNDI: activemq-jndi.properties

# JNDI properties file to setup the JNDI server within ActiveMQ

#
# Default JNDI properties settings
#
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
activemq.network.connector=static:(tcp://localhost:61620)

#activemq.network.connector=broker:(tcp://localhost:61619,network:static:tcp://localhost:61620)?persistent=false&useJmx=true
activemq.data.directory=data61619
activemq.jmx.port=1099

#
# Set the connection factory name(s) as well as the destination names. The connection factory name(s)
# as well as the second part (after the dot) of the left hand side of the destination definition
# must be used in the JNDI lookups.
#
connectionFactoryNames = distributedMsgFactory,simpleMsgFactory
topic.jms/distributedTopic=distributedTopic
topic.jms/normalTopic=normalTopic

distributedMsg=distributedMsgFactory
simpleMsg=simpleMsgFactory

distributedTopic=jms/distributedTopic
normalTopic=jms/normalTopic

ActiveMQStartup :

package com.activemq;

import java.net.URI;

import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.security.JaasAuthenticationPlugin;

public class ActiveMQStartup {
    private final String bindAddress;
    private final String dataDirectory;
    private BrokerService broker = new BrokerService();
    protected final int numRestarts = 3;
    protected final int networkTTL = 2;
    protected final int consumerTTL = 2;
    protected final boolean dynamicOnly = true;
    protected final String networkBroker;
    protected final String jmxPort;

    public ActiveMQStartup() {
        ActiveMQContext context = new ActiveMQContext();
        context.loadJndiProperties();
        bindAddress = ActiveMQContext.getProperty("java.naming.provider.url");
        dataDirectory = ActiveMQContext.getProperty("activemq.data.directory");
        networkBroker = ActiveMQContext.getProperty("activemq.network.connector");
        jmxPort = ActiveMQContext.getProperty("activemq.jmx.port");
    }

    // Start activemq broker service
    public void startBrokerService() {
        try {
            broker.setDataDirectory("../" + dataDirectory);
            broker.setBrokerName(dataDirectory);
            broker.setUseShutdownHook(true);
            TransportConnector connector = new TransportConnector();
            connector.setUri(new URI(bindAddress));         

            //broker.setPlugins(new BrokerPlugin[]{new JaasAuthenticationPlugin()});
            ManagementContext mgContext = new ManagementContext();
            if (networkBroker != null && !networkBroker.isEmpty()) {
                NetworkConnector networkConnector = broker.addNetworkConnector(networkBroker);
                networkConnector.setName(dataDirectory);
                mgContext.setConnectorPort(Integer.parseInt(jmxPort));
                broker.setManagementContext(mgContext);
                configureNetworkConnector(networkConnector);
            }
            broker.setNetworkConnectorStartAsync(true);
            broker.addConnector(connector);
            broker.start();
        } catch (Exception e) {
            System.out.println("Failed to start Apache MQ Broker : " + e);
        }
    }

    private void configureNetworkConnector(NetworkConnector networkConnector) {
        networkConnector.setDuplex(true);
        networkConnector.setNetworkTTL(networkTTL);
        networkConnector.setDynamicOnly(dynamicOnly);
        networkConnector.setConsumerTTL(consumerTTL);
        //networkConnector.setStaticBridge(true);
    }

    // Stop broker service
    public void stopBrokerService() {
        try {
            broker.stop();
        } catch (Exception e) {
            System.out.println("Unable to stop the ApacheMQ Broker service " + e);
        }
    }
}

Я запускаю экземпляр tomcat один за другим и вижу, что сетевое соединение между брокером устанавливается.

Когда я отправляю сообщения из instance1 или instance2(первый раз) он потребляет только этот экземпляр, но когда я отправляю сообщение из второго экземпляра, он используется обоими;

Код в git: https://github.com/AratRana/ApacheActiveMQ

МожетВы указываете мне, где я не прав?

1 Ответ

0 голосов
/ 01 ноября 2018

Наконец я могу это сделать.Когда я запускал получателя во время запуска сервера, я мог видеть получателя сообщений во всех случаях.Таким образом, для достижения этой цели, потребители должны быть запущены перед публикацией любого сообщения.

...