Приложение My Spring Boot внутренне использует встроенный экземпляр брокера ActiveMQ для пользовательских событий приложения. Я использую простые jmsTemplate
для отправки событий и использую их слушателями, объявленными с использованием аннотации @JmsListener
.
Теперь, зная, что ActiveMQ также поддерживает протокол MQTT, как мне добавить поддержку MQTT в эту настройку, чтобы что я мог бы использовать подобный подход для потребления сообщений, поступающих в определенные темы по протоколу MQTT?
Издатели MQTT были бы внешними устройствами, поэтому мне нужно было бы внешнее приложение, чтобы иметь возможность отправлять сообщения через MQTT, и в идеале пусть @JmsListener поднимет их.
Возможно ли это?
Я могу подключиться к этому брокеру с помощью приложения MQTT Explorer и предположительно отправить сообщение, но я его не вижу быть выбранным аннотированным методом @JmsListener(destination = TEST_TOPIC, containerFactory = "myFactory")
.
Я знаю, что наличие внешнего независимого брокера предпочтительнее, но в моей настройке это было бы на самом деле предпочтительным способом.
Конфигурация брокера
@Bean
public BrokerService broker() throws Exception {
BrokerService broker = new BrokerService();
broker.addConnector(format("mqtt://localhost:%d", mqttPort));
broker.setBrokerName("my-embedded-broker");
broker.setUseJmx(false);
broker.start();
return broker;
}
@Bean
public ConnectionFactory jmsConnectionFactory() {
return new ActiveMQConnectionFactory("vm://my-embedded-broker?create=false");
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(jmsConnectionFactory());
jmsTemplate.setDefaultDestinationName(TEST_TOPIC);
return jmsTemplate;
}
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(jmsConnectionFactory());
factory.setAutoStartup(true);
factory.setMessageConverter(jacksonJmsMessageConverter());
factory.setPubSubDomain(true);
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public MappingJackson2HttpMessageConverter mappingJackson2HttpMessageConverter() {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new MappingJackson2HttpMessageConverter(mapper);
}
Пружинные загрузочные бревна
o.apache.activemq.broker.BrokerService : Loaded the Bouncy Castle security provider.
o.apache.activemq.broker.BrokerService : Using Persistence Adapter: KahaDBPersistenceAdapter[.../activemq-data/my-embedded-broker/KahaDB]
o.a.a.broker.jmx.ManagementContext : JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
o.a.a.store.kahadb.plist.PListStoreImpl : PListStore:[.../activemq-data/my-embedded-broker/tmp_storage] started
o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.15.9 (my-embedded-broker, ~~~~~~~~~~~) is starting
o.a.a.t.TransportServerThreadSupport : Listening for connections at: mqtt://localhost:1883
o.a.activemq.broker.TransportConnector : Connector mqtt://localhost:1883 started
o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.15.9 (my-embedded-broker, ~~~~~~~~~~~) started
o.apache.activemq.broker.BrokerService : For help or more information please see: http://activemq.apache.org
...
o.a.activemq.broker.TransportConnector : Connector vm://my-embedded-broker started