spring -gration-mqtt Сообщение о маршруте из входной темы1 в выходную тему2 - PullRequest
1 голос
/ 16 марта 2020

У меня 3 дня на эту проблему: я использую spring -gration-mqtt для чтения из topic1 простого сообщения типа json:

{speed:40,
position:"NewYork"
}

если скорость> 50 записи " ок "в теме 2.

Я правильно прочитал json. но когда я пишу "хорошо" в Topic2, у меня появляется эта ошибка:

org.springframework.messaging.MessageHandlingException: Failed to publish to MQTT in the [bean 'mqttOutbound'; defined in: 'class path resource [it/almaviva/mqtt/Configuration/Producer/Producer.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b95e48b']; nested exception is Connessione già in corso (32110)

Это класс Consumer, который читает и пишет на Producer ..

@Configuration
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);

@Autowired MqttPahoClientFactory mqttClientFactory;
@Autowired
Producer producer;
@Autowired
private ApplicationContext context;
//@Autowired
//static Producer.ProducerGateway gw;

@Bean
public MessageChannel topicChannel(){
    return new DirectChannel();
}

@Bean
public MessageProducer mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
            Parameters.MQTT_CLIENT_ID, mqttClientFactory, Parameters.TOPICS[0]);
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    adapter.setOutputChannel(topicChannel());
    return adapter;
}

@Bean
@ServiceActivator(inputChannel = "topicChannel")
public MessageHandler handler() {
    return message -> {
        OduModel model = JsonUtility.convertToOduModel(String.valueOf(message.getPayload()));
        logger.info(model.toString());
        boolean condition = DataProcessor.processOduModel(model);
        logger.info(String.valueOf(condition));
        //ApplicationContext context = new SpringApplicationBuilder(Producer.class).context();
        Producer.ProducerGateway gw = context.getBean(Producer.ProducerGateway.class);
        if (condition){
            gw.sendToMqtt("OK"!, Parameters.TOPICS[1]);

        }
        else{
            gw.sendToMqtt("No OK", Parameters.TOPICS[1]);
        }

    };
}

}

Parameters.TOPICS [0] является topic1, где я читаю входящее сообщение. Parameters.TOPICS [1] - это "topic2", где я пишу "хорошо" или "не хорошо".

Правильно ли, в частности, использовать этот код в MessageHandler ??

 Producer.ProducerGateway gw = context.getBean(Producer.ProducerGateway.class);
        if (condition){
            gw.sendToMqtt(""OK", Parameters.TOPICS[1]);

        }
        else{
            gw.sendToMqtt("not OK", Parameters.TOPICS[1]);
        }

Я использую эта зависимость от пом. xml

<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
  <version>5.2.3.RELEASE</version>
  <scope>compile</scope>
  <exclusions>
    <exclusion>
      <artifactId>jackson-module-kotlin</artifactId>
      <groupId>com.fasterxml.jackson.module</groupId>
    </exclusion>
  </exclusions>
</dependency>

Большое спасибо. Привет

...