В Spring Boot встроен брокер ActiveMQ как для событий приложений, так и для потребителей MQTT - PullRequest
0 голосов
/ 02 апреля 2020

Приложение My Spring Boot внутренне использует встроенный экземпляр брокера ActiveMQ для пользовательских событий приложения. Я использую простые jmsTemplate для отправки событий и использую их слушателями, объявленными с использованием аннотации @JmsListener.

Теперь, зная, что ActiveMQ также поддерживает протокол MQTT, как мне добавить поддержку MQTT в эту настройку, чтобы что я мог бы использовать подобный подход для потребления сообщений, поступающих в определенные темы по протоколу MQTT?

Издатели MQTT были бы внешними устройствами, поэтому мне нужно было бы внешнее приложение, чтобы иметь возможность отправлять сообщения через MQTT, и в идеале пусть @JmsListener поднимет их.

Возможно ли это?

Я могу подключиться к этому брокеру с помощью приложения MQTT Explorer и предположительно отправить сообщение, но я его не вижу быть выбранным аннотированным методом @JmsListener(destination = TEST_TOPIC, containerFactory = "myFactory").

Я знаю, что наличие внешнего независимого брокера предпочтительнее, но в моей настройке это было бы на самом деле предпочтительным способом.

Конфигурация брокера

    @Bean
    public BrokerService broker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.addConnector(format("mqtt://localhost:%d", mqttPort));
        broker.setBrokerName("my-embedded-broker");
        broker.setUseJmx(false);
        broker.start();
        return broker;
    }

    @Bean
    public ConnectionFactory jmsConnectionFactory() {
        return new ActiveMQConnectionFactory("vm://my-embedded-broker?create=false");
    }

    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(jmsConnectionFactory());
        jmsTemplate.setDefaultDestinationName(TEST_TOPIC);
        return jmsTemplate;
    }

    @Bean
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
                                                    DefaultJmsListenerContainerFactoryConfigurer configurer) {

        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(jmsConnectionFactory());
        factory.setAutoStartup(true);
        factory.setMessageConverter(jacksonJmsMessageConverter());
        factory.setPubSubDomain(true);
        configurer.configure(factory, connectionFactory);

        return factory;
    }

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    @Bean
    public MappingJackson2HttpMessageConverter mappingJackson2HttpMessageConverter() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        return new MappingJackson2HttpMessageConverter(mapper);
    }

Пружинные загрузочные бревна

o.apache.activemq.broker.BrokerService   : Loaded the Bouncy Castle security provider.
o.apache.activemq.broker.BrokerService   : Using Persistence Adapter: KahaDBPersistenceAdapter[.../activemq-data/my-embedded-broker/KahaDB]
o.a.a.broker.jmx.ManagementContext       : JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
o.a.a.store.kahadb.plist.PListStoreImpl  : PListStore:[.../activemq-data/my-embedded-broker/tmp_storage] started
o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.15.9 (my-embedded-broker, ~~~~~~~~~~~) is starting
o.a.a.t.TransportServerThreadSupport     : Listening for connections at: mqtt://localhost:1883
o.a.activemq.broker.TransportConnector   : Connector mqtt://localhost:1883 started
o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.15.9 (my-embedded-broker, ~~~~~~~~~~~) started
o.apache.activemq.broker.BrokerService   : For help or more information please see: http://activemq.apache.org
...
o.a.activemq.broker.TransportConnector   : Connector vm://my-embedded-broker started
...