Подписка СИ на несколько тем mqtt - PullRequest
0 голосов
/ 25 января 2019

Я пытаюсь научиться обрабатывать MQTT-сообщения в Spring-Integration .Создал конвертер, который подписывается с помощью одного MqttPahoMessageDrivenChannelAdapter на тему MQTT для использования и преобразования сообщений.

Проблема в том, что наш провайдер данных планирует «ускорить» публикацию сообщений на его стороне.Таким образом, вместо нескольких (<= 10) тем, в каждой из которых есть сообщения с примерно 150 полями, планируется опубликовать каждое из этих полей в отдельной теме MQTT. </p>

Это означает, что мой конвертер должен будет использоватьca.1000 тем mqtt, но я не знаю, является ли

  1. Spring-интеграция все еще хорошим выбором для него.Причины афаик.упомянутый адаптер использует PAHO MqttClient, который будет принимать сообщения от всех тем, на которые он подписан, в одном потоке, и создание 1000 экземпляров этих адаптеров является излишним.
  2. Если мы будем придерживаться Spring-интеграциии использовать предоставленные компоненты, было бы неплохо создать единый входящий адаптер для всех полей, которые ранее были в сообщениях одной темы, но перенесли преобразование из компонента адаптера в отдельный компонент (который выполняет преобразование) соединяется с каналом-исполнителем для адаптера и, таким образом, выполняет преобразование этих полей в некотором пуле потоков параллельно.

Заранее спасибо за ваши ответы!

1 Ответ

0 голосов
/ 25 января 2019

Я думаю, что ваша идея имеет смысл.

Для этой цели вам нужно реализовать сквозной MqttMessageConverter и предоставить MqttMessage как payload и topic какзаголовок:

public class PassThroughMqttMessageConverter implements MqttMessageConverter {

    @Override
    public Message<?> toMessage(String topic, MqttMessage mqttMessage) {
        return MessageBuilder.withPayload(mqttMessage)
                .setHeader(MqttHeaders.RECEIVED_TOPIC, topic)
                .build();
    }

    @Override
    public Object fromMessage(Message<?> message, Class<?> targetClass) {
        return null;
    }

    @Override
    public Message<?> toMessage(Object payload, MessageHeaders headers) {
        return null;
    }

}

Итак, вы действительно сможете выполнить целевое преобразование вниз по потоку после упомянутого ExecutorChannel в пользовательском transformer.

. Вы также можете рассмотреть возможностьреализовать пользовательский MqttPahoClientFactory (также может работать расширение DefaultMqttPahoClientFactory) и предоставить пользовательский ScheduledExecutorService для внедрения в MqttClient, который вы собираетесь создать в getClientInstance().

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...