Как правильно настроить перераспределение многоадресных сообщений вокруг кластера Artemis - PullRequest
0 голосов
/ 21 мая 2019

Я использую Artemis 2.8.0.

Я запустил два автономных сервера в режиме симметричного кластера и развернул адрес с типом 'multicast' на обоих, а также создал несколько предопределенных очередей, привязанных к этому адресу.Когда я писал сообщения на адрес на первом сервере, он успешно записывал все очереди, связанные с адресом.После этого я подключился ко второму серверу и создал потребителя для одной из очередей, а сообщения с первого сервера не перераспределялись на второй.

Не могу понять, ожидаемое ли это поведение или нет?

Я тоже пытался подключить потребитель по FQQN, но результат был тот же.В документации нет специальной информации о перераспределении «многоадресной рассылки».

мой broker.xml выглядит как

<?xml version='1.0'?>
<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

      <name>server1</name>
      <cluster-user>artemis</cluster-user>
      <cluster-password>artemis</cluster-password>
      <persistence-enabled>true</persistence-enabled>
      <journal-type>ASYNCIO</journal-type>
      <paging-directory>data/paging</paging-directory>
      <bindings-directory>data/bindings</bindings-directory>
      <journal-directory>data/journal</journal-directory>
      <large-messages-directory>data/large-messages</large-messages-directory>
      <journal-datasync>true</journal-datasync>
      <journal-min-files>2</journal-min-files>
      <journal-pool-files>10</journal-pool-files>
      <journal-file-size>10M</journal-file-size>
      <journal-buffer-timeout>20000</journal-buffer-timeout>
      <journal-max-io>4096</journal-max-io>
      <disk-scan-period>5000</disk-scan-period>
      <max-disk-usage>90</max-disk-usage>
      <critical-analyzer>true</critical-analyzer>
      <critical-analyzer-timeout>120000</critical-analyzer-timeout>
      <critical-analyzer-check-period>60000</critical-analyzer-check-period>
      <critical-analyzer-policy>HALT</critical-analyzer-policy>

      <acceptors>
         <acceptor name="artemis">tcp://0.0.0.0:61716?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
         <acceptor name="cluster-acceptor">tcp://0.0.0.0:61717</acceptor>
      </acceptors>
      <connectors>
         <connector name="netty-connector">tcp://localhost:61616</connector>
         <connector name="cluster-connector">tcp://localhost:61617</connector>
      </connectors>
       <cluster-connections>
          <cluster-connection name="k24-artemis-cluster">
            <address></address>
            <connector-ref>netty-connector</connector-ref>
            <check-period>5000</check-period>
            <retry-interval>500</retry-interval>
            <use-duplicate-detection>true</use-duplicate-detection>
            <message-load-balancing>ON_DEMAND</message-load-balancing>
            <max-hops>1</max-hops>
            <static-connectors>
              <connector-ref>cluster-connector</connector-ref>
            </static-connectors>
          </cluster-connection>
        </cluster-connections>

      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="amq"/>
         </security-setting>
      </security-settings>

      <address-settings>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
         <address-setting match="k24.#">
            <redistribution-delay>0</redistribution-delay>
            <max-delivery-attempts>100</max-delivery-attempts>
            <redelivery-delay-multiplier>1.5</redelivery-delay-multiplier>
            <redelivery-delay>5000</redelivery-delay>
            <max-redelivery-delay>50000</max-redelivery-delay>
            <send-to-dla-on-no-route>true</send-to-dla-on-no-route>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-delete-addresses>true</auto-delete-addresses>
            <auto-create-queues>true</auto-create-queues>
            <auto-delete-queues>true</auto-delete-queues>
            <default-purge-on-no-consumers>false</default-purge-on-no-consumers>
            <max-size-bytes>104857600</max-size-bytes><!--100 Mb-->
            <page-size-bytes>20971520</page-size-bytes><!--20 Mb-->
            <address-full-policy>PAGE</address-full-policy>
          </address-setting>
      </address-settings>

      <addresses>
         <address name="DLQ">
            <anycast>
               <queue name="DLQ" />
            </anycast>
         </address>
         <address name="ExpiryQueue">
            <anycast>
               <queue name="ExpiryQueue" />
            </anycast>
         </address>
        <address name="k24.payment">
          <multicast>
            <queue name="k24.payment.bossbi">
              <durable>true</durable>
            </queue>
             <queue name="k24.payment.other">
              <durable>true</durable>
            </queue>
          </multicast>
        </address>
      </addresses>
   </core>
</configuration>

при запуске журнала на первом сервере

2019-05-23 11:12:33,188 INFO  [org.apache.activemq.artemis.core.server] AMQ221080: Deploying address k24.payment supporting [MULTICAST]
2019-05-23 11:12:33,188 INFO  [org.apache.activemq.artemis.core.server] AMQ221003: Deploying MULTICAST queue k24.payment.bossbi on address k24.payment
2019-05-23 11:12:33,188 INFO  [org.apache.activemq.artemis.core.server] AMQ221003: Deploying MULTICAST queue k24.payment.other on address k24.payment
2019-05-23 11:12:33,366 INFO  [org.apache.activemq.audit.base] AMQ601019: User anonymous is getting mbean info on target resource: org.apache.activemq.artemis.core.management.impl.AcceptorControlImpl@1b45c0e []
2019-05-23 11:12:33,373 INFO  [org.apache.activemq.audit.base] AMQ601019: User anonymous is getting mbean info on target resource: org.apache.activemq.artemis.core.management.impl.AcceptorControlImpl@73a8da0f []
2019-05-23 11:12:33,391 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started EPOLL Acceptor at 0.0.0.0:61616 for protocols [CORE,MQTT,AMQP,STOMP,HORNETQ,OPENWIRE]
2019-05-23 11:12:33,400 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started EPOLL Acceptor at 0.0.0.0:61617 for protocols [CORE,MQTT,AMQP,HORNETQ,STOMP,OPENWIRE]
2019-05-23 11:12:33,403 INFO  [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
2019-05-23 11:12:33,403 INFO  [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 2.8.0 [server0, nodeID=d6998d86-7d25-11e9-88ce-7c76357cb366]
2019-05-23 11:12:33,432 INFO  [org.apache.activemq.audit.base] AMQ601267: User anonymous is creating a core session on target resource ActiveMQServerImpl::serverUUID=d6998d86-7d25-11e9-88ce-7c76357cb366 [with parameters: [229b6b91-7d2a-11e9-94a2-106530a6cae3, artemis, ****, 102400, RemotingConnectionImpl [ID=3ded146a, clientID=null, nodeID=d6998d86-7d25-11e9-88ce-7c76357cb366, transportConnection=org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection@3e514577[ID=3ded146a, local= /127.0.0.1:61616, remote=/127.0.0.1:52806]], true, true, true, false, null, org.apache.activemq.artemis.core.protocol.core.impl.CoreSessionCallback@39407896, true, OperationContextImpl [1688453060] [minimalStore=9223372036854775807, storeLineUp=0, stored=0, minimalReplicated=9223372036854775807, replicationLineUp=0, replicated=0, paged=0, minimalPage=9223372036854775807, pageLineUp=0, errorCode=-1, errorMessage=null, executorsPending=0, executor=OrderedExecutor(tasks=[])], {}]]
2019-05-23 11:12:33,461 INFO  [org.apache.activemq.audit.base] AMQ601267: User anonymous is creating a core session on target resource ActiveMQServerImpl::serverUUID=d6998d86-7d25-11e9-88ce-7c76357cb366 [with parameters: [22a110e2-7d2a-11e9-94a2-106530a6cae3, artemis, ****, 102400, RemotingConnectionImpl [ID=3ded146a, clientID=null, nodeID=d6998d86-7d25-11e9-88ce-7c76357cb366, transportConnection=org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection@3e514577[ID=3ded146a, local= /127.0.0.1:61616, remote=/127.0.0.1:52806]], true, true, true, false, null, org.apache.activemq.artemis.core.protocol.core.impl.CoreSessionCallback@200d654a, true, OperationContextImpl [964255832] [minimalStore=9223372036854775807, storeLineUp=0, stored=0, minimalReplicated=9223372036854775807, replicationLineUp=0, replicated=0, paged=0, minimalPage=9223372036854775807, pageLineUp=0, errorCode=-1, errorMessage=null, executorsPending=0, executor=OrderedExecutor(tasks=[])], {}]]
2019-05-23 11:12:33,470 INFO  [org.apache.activemq.audit.base] AMQ601065: User artemis is creating a queue on target resource: ServerSessionImpl() [with parameters: [Address [name=activemq.notifications, id=0, routingTypes={MULTICAST}, autoCreated=false], notif.22a22253-7d2a-11e9-94a2-106530a6cae3.ActiveMQServerImpl_serverUUID=d6998d86-7d25-11e9-88ce-7c76357cb366, _AMQ_Binding_Type<>2 AND _AMQ_NotifType IN ('SESSION_CREATED','BINDING_ADDED','BINDING_REMOVED','CONSUMER_CREATED','CONSUMER_CLOSED','PROPOSAL','PROPOSAL_RESPONSE','UNPROPOSAL') AND _AMQ_Distance<1 AND (((_AMQ_Address NOT LIKE '$.artemis.internal.sf.%') AND (_AMQ_Address NOT LIKE 'activemq.management%'))) AND (_AMQ_NotifType = 'SESSION_CREATED' OR (_AMQ_Address NOT LIKE 'activemq.notifications%')), true, false, -1, false, false, false, -1, null, false, null, false, 0, -1, false, 0, 0, false]]
2019-05-23 11:12:33,494 INFO  [org.apache.activemq.audit.base] AMQ601019: User anonymous is getting mbean info on target resource: org.apache.activemq.artemis.core.management.impl.QueueControlImpl@28cb8c95 []
2019-05-23 11:12:33,507 INFO  [org.apache.activemq.audit.base] AMQ601265: User artemis is creating a core consumer on target resource ServerSessionImpl() [with parameters: [0, notif.22a22253-7d2a-11e9-94a2-106530a6cae3.ActiveMQServerImpl_serverUUID=d6998d86-7d25-11e9-88ce-7c76357cb366, null, 0, false, true, null]]
2019-05-23 11:12:33,542 INFO  [org.apache.activemq.artemis.core.server] AMQ221027: Bridge ClusterConnectionBridge@9fe7fc4 [name=$.artemis.internal.sf.k24-artemis-cluster.d8938882-7d25-11e9-9b96-106530a6cae3, queue=QueueImpl[name=$.artemis.internal.sf.k24-artemis-cluster.d8938882-7d25-11e9-9b96-106530a6cae3, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=d6998d86-7d25-11e9-88ce-7c76357cb366], temp=false]@66c0982c targetConnector=ServerLocatorImpl (identity=(Cluster-connection-bridge::ClusterConnectionBridge@9fe7fc4 [name=$.artemis.internal.sf.k24-artemis-cluster.d8938882-7d25-11e9-9b96-106530a6cae3, queue=QueueImpl[name=$.artemis.internal.sf.k24-artemis-cluster.d8938882-7d25-11e9-9b96-106530a6cae3, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=d6998d86-7d25-11e9-88ce-7c76357cb366], temp=false]@66c0982c targetConnector=ServerLocatorImpl [initialConnectors=[TransportConfiguration(name=netty-connector, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61616&host=localhost], discoveryGroupConfiguration=null]]::ClusterConnectionImpl@1676605578[nodeUUID=d6998d86-7d25-11e9-88ce-7c76357cb366, connector=TransportConfiguration(name=netty-connector, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61716&host=localhost, address=, server=ActiveMQServerImpl::serverUUID=d6998d86-7d25-11e9-88ce-7c76357cb366])) [initialConnectors=[TransportConfiguration(name=netty-connector, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61616&host=localhost], discoveryGroupConfiguration=null]] is connected
2019-05-23 11:12:33,548 INFO  [org.apache.activemq.audit.message] AMQ601500: User artemis is sending a core message on target resource: ServerSessionImpl() [with parameters: [TransactionImpl [xid=null, txID=378, xid=null, state=ACTIVE, createTime=1558595553462(Thu May 23 11:12:33 SAMT 2019), timeoutSeconds=300, nr operations = 0]@5c8669b1, CoreMessage[messageID=0,durable=false,userID=null,priority=4, timestamp=Thu May 23 11:12:33 SAMT 2019,expiration=0, durable=false, address=activemq.management,size=457,properties=TypedProperties[_AMQ_OperationName=sendQueueInfoToQueue,_AMQ_ResourceName=broker]]@1649442918, true, false, RoutingContextImpl(Address=null, routingType=null, PreviousAddress=null previousRoute:null, reusable=null, version=0)
..................................................
]]
2019-05-23 11:12:33,552 INFO  [org.apache.activemq.audit.base] AMQ601263: User artemis is handling a management message on target resource 22a110e2-7d2a-11e9-94a2-106530a6cae3 [with parameters: [TransactionImpl [xid=null, txID=378, xid=null, state=ACTIVE, createTime=1558595553462(Thu May 23 11:12:33 SAMT 2019), timeoutSeconds=300, nr operations = 0]@5c8669b1, CoreMessage[messageID=385,durable=false,userID=null,priority=4, timestamp=Thu May 23 11:12:33 SAMT 2019,expiration=0, durable=false, address=activemq.management,size=457,properties=TypedProperties[_AMQ_OperationName=sendQueueInfoToQueue,_AMQ_ResourceName=broker]]@1649442918, true]]

А на втором

2019-05-23 11:12:27,556 INFO  [org.apache.activemq.artemis.core.server] AMQ221080: Deploying address k24.payment supporting [MULTICAST]
2019-05-23 11:12:27,557 INFO  [org.apache.activemq.artemis.core.server] AMQ221003: Deploying MULTICAST queue k24.payment.bossbi on address k24.payment
2019-05-23 11:12:27,557 INFO  [org.apache.activemq.artemis.core.server] AMQ221003: Deploying MULTICAST queue k24.payment.other on address k24.payment
2019-05-23 11:12:27,734 INFO  [org.apache.activemq.audit.base] AMQ601019: User anonymous is getting mbean info on target resource: org.apache.activemq.artemis.core.management.impl.AcceptorControlImpl@75d2da2d []
2019-05-23 11:12:27,736 INFO  [org.apache.activemq.audit.base] AMQ601019: User anonymous is getting mbean info on target resource: org.apache.activemq.artemis.core.management.impl.AcceptorControlImpl@3370f42 []
2019-05-23 11:12:27,753 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started EPOLL Acceptor at 0.0.0.0:61716 for protocols [CORE,MQTT,AMQP,STOMP,HORNETQ,OPENWIRE]
2019-05-23 11:12:27,762 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started EPOLL Acceptor at 0.0.0.0:61717 for protocols [CORE,MQTT,AMQP,HORNETQ,STOMP,OPENWIRE]
2019-05-23 11:12:27,766 INFO  [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
2019-05-23 11:12:27,766 INFO  [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 2.8.0 [server1, nodeID=d8938882-7d25-11e9-9b96-106530a6cae3]
2019-05-23 11:12:27,993 INFO  [org.apache.activemq.hawtio.branding.PluginContextListener] Initialized activemq-branding plugin
2019-05-23 11:12:28,044 INFO  [org.apache.activemq.hawtio.plugin.PluginContextListener] Initialized artemis-plugin plugin
2019-05-23 11:12:28,390 INFO  [io.hawt.HawtioContextListener] Initialising hawtio services
2019-05-23 11:12:28,401 INFO  [io.hawt.system.ConfigManager] Configuration will be discovered via system properties
2019-05-23 11:12:28,402 INFO  [io.hawt.jmx.JmxTreeWatcher] Welcome to hawtio 1.5.5 : http://hawt.io/ : Don't cha wish your console was hawt like me? ;-)
2019-05-23 11:12:28,404 INFO  [io.hawt.jmx.UploadManager] Using file upload directory: /home/dmitry/work/tools/apache-artemis-2.6.2/broker1/tmp/uploads
2019-05-23 11:12:28,414 INFO  [io.hawt.web.AuthenticationFilter] Starting hawtio authentication filter, JAAS realm: "activemq" authorized role(s): "amq" role principal classes: "org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal"
2019-05-23 11:12:28,441 INFO  [io.hawt.web.JolokiaConfiguredAgentServlet] Jolokia overridden property: [key=policyLocation, value=file:/home/dmitry/work/tools/apache-artemis-2.6.2/broker1/etc/jolokia-access.xml]
2019-05-23 11:12:28,461 INFO  [io.hawt.web.RBACMBeanInvoker] Using MBean [hawtio:type=security,area=jmx,rank=0,name=HawtioDummyJMXSecurity] for role based access control
2019-05-23 11:12:28,552 INFO  [io.hawt.system.ProxyWhitelist] Initial proxy whitelist: [localhost, 127.0.0.1, 10.255.100.22, 10.255.100.32, bdk-laptop.lan.itecos.com]
2019-05-23 11:12:28,761 INFO  [org.apache.activemq.artemis] AMQ241001: HTTP Server started at http://localhost:8261
2019-05-23 11:12:28,761 INFO  [org.apache.activemq.artemis] AMQ241002: Artemis Jolokia REST API available at http://localhost:8261/console/jolokia
2019-05-23 11:12:28,762 INFO  [org.apache.activemq.artemis] AMQ241004: Artemis Console available at http://localhost:8261/console
2019-05-23 11:12:33,417 INFO  [org.apache.activemq.audit.base] AMQ601267: User anonymous is creating a core session on target resource ActiveMQServerImpl::serverUUID=d8938882-7d25-11e9-9b96-106530a6cae3 [with parameters: [2298850f-7d2a-11e9-bf72-106530a6cae3, artemis, ****, 102400, RemotingConnectionImpl [ID=05d224cf, clientID=null, nodeID=d8938882-7d25-11e9-9b96-106530a6cae3, transportConnection=org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection@27357d98[ID=05d224cf, local= /127.0.0.1:61716, remote=/127.0.0.1:57184]], true, true, true, false, null, org.apache.activemq.artemis.core.protocol.core.impl.CoreSessionCallback@1c98f1c2, true, OperationContextImpl [92544794] [minimalStore=9223372036854775807, storeLineUp=0, stored=0, minimalReplicated=9223372036854775807, replicationLineUp=0, replicated=0, paged=0, minimalPage=9223372036854775807, pageLineUp=0, errorCode=-1, errorMessage=null, executorsPending=0, executor=OrderedExecutor(tasks=[])], {}]]
2019-05-23 11:12:33,450 INFO  [org.apache.activemq.audit.base] AMQ601267: User anonymous is creating a core session on target resource ActiveMQServerImpl::serverUUID=d8938882-7d25-11e9-9b96-106530a6cae3 [with parameters: [229f89f0-7d2a-11e9-bf72-106530a6cae3, artemis, ****, 102400, RemotingConnectionImpl [ID=05d224cf, clientID=null, nodeID=d8938882-7d25-11e9-9b96-106530a6cae3, transportConnection=org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection@27357d98[ID=05d224cf, local= /127.0.0.1:61716, remote=/127.0.0.1:57184]], true, true, true, false, null, org.apache.activemq.artemis.core.protocol.core.impl.CoreSessionCallback@6e34b6c6, true, OperationContextImpl [191090876] [minimalStore=9223372036854775807, storeLineUp=0, stored=0, minimalReplicated=9223372036854775807, replicationLineUp=0, replicated=0, paged=0, minimalPage=9223372036854775807, pageLineUp=0, errorCode=-1, errorMessage=null, executorsPending=0, executor=OrderedExecutor(tasks=[])], {}]]
2019-05-23 11:12:33,489 INFO  [org.apache.activemq.audit.base] AMQ601065: User artemis is creating a queue on target resource: ServerSessionImpl() [with parameters: [Address [name=activemq.notifications, id=0, routingTypes={MULTICAST}, autoCreated=false], notif.22a4ba11-7d2a-11e9-bf72-106530a6cae3.ActiveMQServerImpl_serverUUID=d8938882-7d25-11e9-9b96-106530a6cae3, _AMQ_Binding_Type<>2 AND _AMQ_NotifType IN ('SESSION_CREATED','BINDING_ADDED','BINDING_REMOVED','CONSUMER_CREATED','CONSUMER_CLOSED','PROPOSAL','PROPOSAL_RESPONSE','UNPROPOSAL') AND _AMQ_Distance<1 AND (((_AMQ_Address NOT LIKE '$.artemis.internal.sf.%') AND (_AMQ_Address NOT LIKE 'activemq.management%'))) AND (_AMQ_NotifType = 'SESSION_CREATED' OR (_AMQ_Address NOT LIKE 'activemq.notifications%')), true, false, -1, false, false, false, -1, null, false, null, false, 0, -1, false, 0, 0, false]]
2019-05-23 11:12:33,537 INFO  [org.apache.activemq.audit.base] AMQ601019: User anonymous is getting mbean info on target resource: org.apache.activemq.artemis.core.management.impl.QueueControlImpl@7865d90f []
2019-05-23 11:12:33,547 INFO  [org.apache.activemq.audit.base] AMQ601265: User artemis is creating a core consumer on target resource ServerSessionImpl() [with parameters: [0, notif.22a4ba11-7d2a-11e9-bf72-106530a6cae3.ActiveMQServerImpl_serverUUID=d8938882-7d25-11e9-9b96-106530a6cae3, null, 0, false, true, null]]
2019-05-23 11:12:33,582 INFO  [org.apache.activemq.artemis.core.server] AMQ221027: Bridge ClusterConnectionBridge@5c52e5b7 [name=$.artemis.internal.sf.k24-artemis-cluster.d6998d86-7d25-11e9-88ce-7c76357cb366, queue=QueueImpl[name=$.artemis.internal.sf.k24-artemis-cluster.d6998d86-7d25-11e9-88ce-7c76357cb366, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=d8938882-7d25-11e9-9b96-106530a6cae3], temp=false]@3bb7d355 targetConnector=ServerLocatorImpl (identity=(Cluster-connection-bridge::ClusterConnectionBridge@5c52e5b7 [name=$.artemis.internal.sf.k24-artemis-cluster.d6998d86-7d25-11e9-88ce-7c76357cb366, queue=QueueImpl[name=$.artemis.internal.sf.k24-artemis-cluster.d6998d86-7d25-11e9-88ce-7c76357cb366, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=d8938882-7d25-11e9-9b96-106530a6cae3], temp=false]@3bb7d355 targetConnector=ServerLocatorImpl [initialConnectors=[TransportConfiguration(name=netty-connector, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61716&host=localhost], discoveryGroupConfiguration=null]]::ClusterConnectionImpl@1052212904[nodeUUID=d8938882-7d25-11e9-9b96-106530a6cae3, connector=TransportConfiguration(name=netty-connector, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61616&host=localhost, address=, server=ActiveMQServerImpl::serverUUID=d8938882-7d25-11e9-9b96-106530a6cae3])) [initialConnectors=[TransportConfiguration(name=netty-connector, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61716&host=localhost], discoveryGroupConfiguration=null]] is connected
2019-05-23 11:12:33,589 INFO  [org.apache.activemq.audit.message] AMQ601500: User artemis is sending a core message on target resource: ServerSessionImpl() [with parameters: [TransactionImpl [xid=null, txID=97, xid=null, state=ACTIVE, createTime=1558595553475(Thu May 23 11:12:33 SAMT 2019), timeoutSeconds=300, nr operations = 0]@d735c32, CoreMessage[messageID=0,durable=false,userID=null,priority=4, timestamp=Thu May 23 11:12:33 SAMT 2019,expiration=0, durable=false, address=activemq.management,size=457,properties=TypedProperties[_AMQ_OperationName=sendQueueInfoToQueue,_AMQ_ResourceName=broker]]@24312193, true, false, RoutingContextImpl(Address=null, routingType=null, PreviousAddress=null previousRoute:null, reusable=null, version=0)
..................................................
]]
2019-05-23 11:12:33,592 INFO  [org.apache.activemq.audit.base] AMQ601263: User artemis is handling a management message on target resource 229f89f0-7d2a-11e9-bf72-106530a6cae3 [with parameters: [TransactionImpl [xid=null, txID=97, xid=null, state=ACTIVE, createTime=1558595553475(Thu May 23 11:12:33 SAMT 2019), timeoutSeconds=300, nr operations = 0]@d735c32, CoreMessage[messageID=104,durable=false,userID=null,priority=4, timestamp=Thu May 23 11:12:33 SAMT 2019,expiration=0, durable=false, address=activemq.management,size=457,properties=TypedProperties[_AMQ_OperationName=sendQueueInfoToQueue,_AMQ_ResourceName=broker]]@24312193, true]]

Мой клиентский код написан на Scala.Однако я думаю, что это должно быть ясно.

import java.util

import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy
import org.apache.activemq.artemis.api.core.client._
import org.apache.activemq.artemis.core.remoting.impl.netty.{NettyConnectorFactory, TransportConstants}
import org.slf4j.LoggerFactory
import org.testng.Assert
import org.testng.annotations.Test

import scala.collection.JavaConverters._

class MulticastClusterMsgRedistributionTest {

  private val log = LoggerFactory.getLogger(getClass)

  private val ADDRESS_NAME = "k24.payment"
  private val QUEUE_NAME1 = "k24.payment.bossbi"
  private val QUEUE_NAME2 = "k24.payment.other"

  @Test
  def write(): Unit = {
    val sessionFactory = createSessionFactory(61616)
    val session = sessionFactory.createSession(false, false, false)
    val producer = session.createProducer(ADDRESS_NAME)
    session.start()
    1.to(50).foreach{ i =>
      val msg = session.createMessage(true)
      msg.writeBodyBufferString(s"msg $i")
      producer.send(msg)
      session.commit()
    }
    session.close()
    sessionFactory.close()

  }

  @Test
  def read(): Unit = {
    val sessionFactory = createSessionFactory(61716)
    val session = sessionFactory.createSession(false, false, false)
    val consumer = session.createConsumer(s"$ADDRESS_NAME::$QUEUE_NAME1")
    session.start()
    1.to(50).foreach{ i =>
      val msg = consumer.receive(2000)
      Assert.assertNotNull(msg)
      val body = msg.getBodyBuffer.readString()
      Assert.assertEquals(body, s"msg $i")
      log.info(s"-> $body")
      msg.acknowledge()
      session.commit()
    }
  }

  private def createSessionFactory(targetServerPort: Int): ClientSessionFactory = {
    val server = transportConfiguration(targetServerPort)
    val loc = ActiveMQClient.createServerLocatorWithHA(Seq(server): _*)
    loc.setUseTopologyForLoadBalancing(true)
    loc.setConnectionLoadBalancingPolicyClassName(classOf[RoundRobinConnectionLoadBalancingPolicy].getName)
    loc.addClusterTopologyListener(topologyListener(loc))

    val sessionFactory = loc.createSessionFactory()
    log.info(s"Session factory attached to ${sessionFactory.getConnection.getRemoteAddress}")
    sessionFactory
  }

  private def transportConfiguration(port: Integer): TransportConfiguration = {
    val srvParams  = new util.HashMap[String, Object]()
    srvParams.put(TransportConstants.HOST_PROP_NAME, "localhost")
    srvParams.put(TransportConstants.PORT_PROP_NAME, port)
    new TransportConfiguration(classOf[NettyConnectorFactory].getName, srvParams, s"srv_localhost_$port")
  }

  private def topologyListener(loc: ServerLocator) = {
    new ClusterTopologyListener {
      override def nodeUP(m: TopologyMember, last: Boolean): Unit = {
        log.info(s"\t artemis member up = ${memberState(m)}")
        log.info(s"cluster state:\n ${loc.getTopology.getMembers.asScala.map(memberState).mkString("\n")}")
      }

      override def nodeDown(eventUID: Long, nodeID: String): Unit = {
        log.info(s"\t artemis member down = $nodeID")
        log.info(s"cluster state:\n ${loc.getTopology.getMembers.asScala.map(memberState).mkString("\n")}")
      }
    }
  }

  private def memberState(m: TopologyMember): String = {
    val live = Option(m.getLive)
    val backup = Option(m.getBackup)
    val backupState = backup.map{ b =>
      s"${b.getParams.get(TransportConstants.HOST_PROP_NAME)}:${b.getParams.get(TransportConstants.PORT_PROP_NAME)}"
    }
    val liveState = live.map{ l =>
      s"${l.getParams.get(TransportConstants.HOST_PROP_NAME)}:${l.getParams.get(TransportConstants.PORT_PROP_NAME)}"
    }
    s"$liveState backup to $backupState"
  }
}

Я полагаю, что Артемида должна перераспределять сообщения из всех очередей, связанных с многоадресным адресом, с первого сервера на второй, когда на втором сервере есть потребители.

...