Spring Integration (MQTT): получение опубликованного сообщения - PullRequest
0 голосов
/ 01 февраля 2019

Я новичок в весенней загрузке и пытаюсь использовать пример примера из весенней интеграции для подписки и публикации с использованием MQTT.Мне удается интегрировать его с Thingsboard, и регистратор в приведенном ниже коде может получить опубликованное сообщение от Thingsboard.

public static void main(String[] args) {
    SpringApplication.run(MqttTest.class);
}

@Bean
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(new String[] { "URI HERE" });
    options.setUserName("ACCESS TOKEN HERE");
    factory.setConnectionOptions(options);
    return factory;
}

// consumer

@Bean
public IntegrationFlow mqttInFlow() {
    return IntegrationFlows.from(mqttInbound())
            .transform(p -> p)
            .handle(logger())
            .get();
}

private LoggingHandler logger() {
    LoggingHandler loggingHandler = new LoggingHandler("INFO");
    loggingHandler.setLoggerName("LoggerBot");
    return loggingHandler;
}

@Bean
public MessageProducerSupport mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("Consumer",
            mqttClientFactory(), "v1/devices/me/rpc/request/+");
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    return adapter;
}

Это вывод консоли.Я могу получить опубликованное сообщение json, отправленное с панели инструментов.Мне интересно, есть ли метод вызова для извлечения строки сообщения json, чтобы я мог обработать ее дальше.Спасибо.

2019-02-01 14:06:23.590  INFO 13416 --- [ Call: Consumer] LoggerBot : {"method":"setValue","params":true}
2019-02-01 14:06:24.840  INFO 13416 --- [ Call: Consumer] LoggerBot : {"method":"setValue","params":false}

1 Ответ

0 голосов
/ 01 февраля 2019

Чтобы обработать опубликованные сообщения, подпишите дескрипторы сообщений для потока, который будет использовать сообщения.

MessageHandler

    @Bean
    public IntegrationFlow mqttInFlow() {
        return IntegrationFlows.from(mqttInbound())
                .transform(p -> p)
                .handle( mess -> {
                   System.out.println("mess"+mess);
                 })            
                .get();
    }

ServiceActivator


    @Bean
    public IntegrationFlow mqttInFlow() {
        return IntegrationFlows.from(mqttInbound())
                .transform(p -> p)
                .handle("myService","handleHere")
                .handle(logger())
                .get();
    }

@Component
public class MyService {

    @ServiceActivator
    public Object handleHere(@Payload Object mess) {
        System.out.println("payload "+mess);
        return mess;
    }
}

Примечание. Как мы уже говорили, существует множество различных способов.достижения этого.Это просто пример для вашего понимания.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...