У меня есть два брокера ActiveMQ Artemis, работающих на разных машинах, составляющих простой кластер. Я использую приложение Java (очень базовое c) для создания и использования сообщений для анализа поведения кластера. Код Java выглядит следующим образом:
public void runExample() throws Exception {
InitialContext initialContext = null;
Connection connectionA = null;
try {
Properties properties = new Properties();
properties.put("java.naming.factory.initial", "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
properties.put("connectionFactory.ConnectionFactory", "udp://231.7.7.7:9876");
properties.put("queue.queue/anotherExampleQueue", "anotherExampleQueue");
initialContext = new InitialContext(properties);
Queue queue = (Queue) initialContext.lookup("queue/anotherExampleQueue");
ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
Thread.sleep(5000);
connectionA = connectionFactory.createConnection("admin", "admin");
Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE);
System.out.println("Session A - " + ((ClientSessionInternal)((org.apache.activemq.artemis.jms.client.ActiveMQSession) sessionA).getCoreSession()).getConnection().getRemoteAddress());
MessageProducer producerA = sessionA.createProducer(queue);
final int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
TextMessage messageA = sessionA.createTextMessage("A:This is text message " + i);
producerA.send(messageA);
System.out.println("Sent message: " + messageA.getText());
}
connectionA.start();
consume(sessionA, queue, numMessages, "A");
} finally {
if (connectionA != null) {
connectionA.close();
}
if (initialContext != null) {
initialContext.close();
}
}
}
private static void consume(Session session, Queue queue, int numMessages, String node) throws JMSException {
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < numMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(2000);
if(message!=null)
System.out.println("Got message: " + message.getText() + " from node " + node);
}
System.out.println("receive other message from node " + node + ": " + consumer.receive(2000));
}
При отладке вышеуказанного приложения с точкой останова на connectionA.start()
. Если я остановлю свой главный брокер, я увижу, что подчиненный брокер взял на себя управление и все сообщения были перемещены подчиненному брокеру, как и ожидалось. Однако на этом этапе, если я продолжу работу со своим приложением, оно выдает javax.jms.IllegalStateException: AMQ219019: Session is closed
вместо того, чтобы принимать сообщения на подчиненном брокере. То же самое происходит, когда я снова запускаю своего главного брокера и продолжаю отладку. В документации говорится, что автоматическое переключение c клиента произойдет автоматически.
Вот фрагмент главного брокера. xml
<connectors>
<connector name="clusterConnectorOne">tcp://10.10.170.5:61616</connector>
</connectors>
<discovery-groups>
<discovery-group name="my-discovery-group">
<local-bind-address>10.10.170.5</local-bind-address>
<group-address>231.7.7.7</group-address>
<group-port>9876</group-port>
<refresh-timeout>10000</refresh-timeout>
</discovery-group>
</discovery-groups>
<cluster-connections>
<cluster-connection name="my-cluster">
<connector-ref>clusterConnectorOne</connector-ref>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>STRICT</message-load-balancing>
<max-hops>1</max-hops>
<discovery-group-ref discovery-group-name="my-discovery-group"/>
</cluster-connection>
</cluster-connections>
<broadcast-groups>
<broadcast-group name="my-broadcast-group">
<local-bind-address>10.10.170.5</local-bind-address>
<local-bind-port>5432</local-bind-port>
<group-address>231.7.7.7</group-address>
<group-port>9876</group-port>
<broadcast-period>2000</broadcast-period>
<connector-ref>clusterConnectorOne</connector-ref>
</broadcast-group>
</broadcast-groups>
<ha-policy>
<replication>
<master>
<check-for-live-server>true</check-for-live-server>
</master>
</replication>
</ha-policy>
Я не мог выяснить, что здесь не так, есть предложения?