Весенняя загрузка при загрузке apache activemq в artemis - PullRequest
0 голосов
/ 29 марта 2020

Я использую apache activemq с пружинной загрузкой и хочу перейти на apache artemis для улучшения использования кластера и узлов.

В настоящее время я использую в основном концепцию VirtualTopics и с JMS, такими как

@JMSListener(destination = "Consumer.A.VirtualTopic.simple")
public void receiveMessage() {
    ...
}

...

public void send(JMSTemplate template) {
    template.convertAndSend("VirtualTopic.simple", "Hello world!");
}

Я прочитал, что Артемис изменил свою модель адресов на адреса, очереди и типы маршрутизации вместо очередей, тем и виртуальных тем как в activemq. Я прочитал намного больше, но я думаю, что не понимаю, как я могу мигрировать сейчас. Я попробовал это так же, как и выше, поэтому я импортировал Artemis JMSClient из Maven и хотел использовать его, как и раньше, но с FQQN (полное имя очереди) или VirtualTopi c -Wildcard вы можете прочитать на некоторых источниках. Но так или иначе это не работает должным образом.

Мои вопросы: - Как я могу перенести VirtualTopics? Я правильно понял с FQQN и этими виртуальными символами-подстановками? - Как я могу указать типы маршрутизации anycast и multicast для примеров кода выше? (В сетевых примерах адреса и очереди жестко закодированы в брокере серверов. xml, но я хочу создать его на лету приложения.) - Как я могу использовать его с протоколом openwire и как приложение знает, что оно использует? Зависит ли это только от порта, которым я пользуюсь? Итак, 61616 для openwire?

Может кто-нибудь помочь прояснить мои мысли?

ОБНОВЛЕНИЕ:

Некоторые дополнительные вопросы.

1 Я всегда читаю что-то вроде «потребитель по умолчанию 5.x». Ожидается ли тогда смешаться с Артемидой? Как если бы вы оставили все эти соглашения об именах и просто добавили адреса к имени VirtualTopi c в FQQN, и просто изменили зависимости на artemis?

2) Я уже попробовал «virtualTopicConsumerWildcards» с "import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;" и "import org.apache.activemq.ActiveMQConnectionFactory;", но только во втором случае это имело значение.

3) Я также пытался использовать только OpenWire в качестве протокола в акцепторе, но в этом случае (и с "import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;") я получаю следующую ошибку при запуске моего приложения: "2020-03-30 11:41:19,504 ERROR [org.apache.activemq.artemis.core.server] AMQ224096: Error setting up connection from /127.0.0.1:54201 to /127.0.0.1:61616; protocol CORE not found in map: [OPENWIRE]".

4) Должен ли я указать, например, multicast:://VirtualTopic.simple как имя получателя в template.convertAndSend(...)? Я попытался template.setPubSubDomain(true) для типа многоадресной маршрутизации и оставил его для anycast, это работает. Но это хороший способ?

5) Может быть, вы знаете, как я могу "сказать" моему приложению spring-boot с template.convertAndSend(...); использовать Openwire?

UPDATE2 : Общие долговременные подписки

@JmsListener(destination = "VirtualTopic.test", id = "c1", subscription = "Consumer.A.VirtualTopic.test", containerFactory = "queueConnectionFactory")
public void receive1(String m) {

}

@JmsListener(destination = "VirtualTopic.test", id = "c2", subscription = "Consumer.B.VirtualTopic.test", containerFactory = "queueConnectionFactory")
public void receive2(String m) {

}

@Bean
public DefaultJmsListenerContainerFactory queueConnectionFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setClientId("brokerClientId");
    factory.setSubscriptionDurable(true);
    factory.setSubscriptionShared(true);
    return factory;
}

Ошибки:

2020-04-17 11:23:44.485  WARN 7900 --- [enerContainer-3] o.s.j.l.DefaultMessageListenerContainer  : Setup of JMS message listener invoker failed for destination 'VirtualTopic.test' - trying to recover. Cause: org.apache.activemq.ActiveMQSession.createSharedDurableConsumer(Ljavax/jms/Topic;Ljava/lang/String;Ljava/lang/String;)Ljavax/jms/MessageConsumer; 
2020-04-17 11:23:44.514 ERROR 7900 --- [enerContainer-3] o.s.j.l.DefaultMessageListenerContainer  : Could not refresh JMS Connection for destination 'VirtualTopic.test' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Broker: d1 - Client: brokerClientId already connected from /127.0.0.1:59979

Что я здесь не так делаю?

1 Ответ

2 голосов
/ 30 марта 2020

Идея, стоящая за виртуальными темами, заключается в том, что производители отправляют на topi c обычным способом JMS, а потребитель может использовать физическую очередь для подписки на логический topi c, что позволяет многим пользователям работать на многих машинах. & потоки для балансировки нагрузки.

Артемида использует очередь для каждой модели подписчика topi c внутри, и, возможно, она напрямую обращается к очереди подписки, используя полное имя очереди (FQQN ).

Например, получатель по умолчанию 5.x для topi c VirtualTopi c .простая подписка A Consumer.A.VirtualTopic.simple будет заменен на FQQN Artemis, состоящий из адреса и очереди VirtualTopic.simple::Consumer.A.VirtualTopic.simple .

Однако Artemis поддерживает механизм фильтрации подстановочных знаков виртуальных топик c, который автоматически преобразует получателя в соответствующее FQQN. Чтобы включить механизм фильтрации, можно использовать свойство строки конфигурации virtualTopicConsumerWildcards. Он состоит из двух частей, разделенных ;, ie виртуальным топи по умолчанию 5.x c с префиксом потребителя Consumer.*., для которого требуется фильтр virtualTopicConsumerWildcards Consumer.*.>;2.

Artemis по умолчанию настроен на автоматическое создание пунктов назначения, запрошенных клиентами. Они могут указать специальный префикс при подключении к адресу, чтобы указать, какой тип маршрутизации использовать. Их можно включить, добавив свойство строки конфигурации anycastPrefix и multicastPrefix в акцептор, более подробную информацию можно найти в Использование префиксов для определения типа маршрутизации . Например, добавление к получателю anycastPrefix=anycast://;multicastPrefix=multicast://, если клиенту необходимо отправить сообщение только в одну из очередей ANYCAST, следует использовать пункт назначения anycast:://VirtualTopic.simple, если клиенту необходимо отправить сообщение в MULTICAST, следует использовать пункт назначения multicast:://VirtualTopic.simple.

Artemis акцепторы поддерживают использование одного порта для всех протоколов, они автоматически определяют, какой протокол используется CORE, AMQP, STOMP или OPENWIRE, но можно ограничить, какие протоколы поддерживаются с помощью параметра protocol.

Следующий акцептор включает префикс anycast anycast://, префикс многоадресной рассылки multicast:// и подстановочные знаки виртуального topi c, отключая все протоколы, кроме OPENWIRE на локальном хосте конечной точки : 61616.

<acceptor name="artemis">tcp://localhost:61616?anycastPrefix=anycast://;multicastPrefix=multicast://;virtualTopicConsumerWildcards=Consumer.*.%3E%3B2;protocols=OPENWIRE</acceptor>

ОБНОВЛЕНИЕ: В следующем примере приложение подключается к экземпляру Artemis с предыдущим получателем по протоколу OpenWire.

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

@SpringBootApplication
@EnableJms
public class Application {

   private final String BROKER_URL = "tcp://localhost:61616";
   private final String BROKER_USERNAME = "admin";
   private final String BROKER_PASSWORD = "admin";

   public static void main(String[] args) throws Exception {
      final ConfigurableApplicationContext context = SpringApplication.run(Application.class);
      System.out.println("********************* Sending message...");

      JmsTemplate jmsTemplate = context.getBean("jmsTemplate", JmsTemplate.class);
      JmsTemplate jmsTemplateAnycast = context.getBean("jmsTemplateAnycast", JmsTemplate.class);
      JmsTemplate jmsTemplateMulticast = context.getBean("jmsTemplateMulticast", JmsTemplate.class);

      jmsTemplateAnycast.convertAndSend("VirtualTopic.simple", "Hello world anycast!");
      jmsTemplate.convertAndSend("anycast://VirtualTopic.simple", "Hello world anycast using prefix!");
      jmsTemplateMulticast.convertAndSend("VirtualTopic.simple", "Hello world multicast!");
      jmsTemplate.convertAndSend("multicast://VirtualTopic.simple", "Hello world multicast using prefix!");

      System.out.print("Press any key to close the context");
      System.in.read();

      context.close();
   }

   @Bean
   public ActiveMQConnectionFactory connectionFactory(){
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
      connectionFactory.setBrokerURL(BROKER_URL);
      connectionFactory.setUserName(BROKER_USERNAME);
      connectionFactory.setPassword(BROKER_PASSWORD);
      return connectionFactory;
   }

   @Bean
   public JmsTemplate jmsTemplate(){
      JmsTemplate template = new JmsTemplate();
      template.setConnectionFactory(connectionFactory());
      return template;
   }

   @Bean
   public JmsTemplate jmsTemplateAnycast(){
      JmsTemplate template = new JmsTemplate();
      template.setPubSubDomain(false);
      template.setConnectionFactory(connectionFactory());
      return template;
   }

   @Bean
   public JmsTemplate jmsTemplateMulticast(){
      JmsTemplate template = new JmsTemplate();
      template.setPubSubDomain(true);
      template.setConnectionFactory(connectionFactory());
      return template;
   }

   @Bean
   public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
      DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
      factory.setConnectionFactory(connectionFactory());
      factory.setConcurrency("1-1");
      return factory;
   }

   @JmsListener(destination = "Consumer.A.VirtualTopic.simple")
   public void receiveMessageFromA(String message) {
      System.out.println("*********************** MESSAGE RECEIVED FROM A: " + message);
   }

   @JmsListener(destination = "Consumer.B.VirtualTopic.simple")
   public void receiveMessageFromB(String message) {
      System.out.println("*********************** MESSAGE RECEIVED FROM B: " + message);
   }

   @JmsListener(destination = "VirtualTopic.simple")
   public void receiveMessageFromTopic(String message) {
      System.out.println("*********************** MESSAGE RECEIVED FROM TOPIC: " + message);
   }
}
...