У меня 3 дня на эту проблему: я использую spring -gration-mqtt для чтения из topic1 простого сообщения типа json:
{speed:40,
position:"NewYork"
}
если скорость> 50 записи " ок "в теме 2.
Я правильно прочитал json. но когда я пишу "хорошо" в Topic2, у меня появляется эта ошибка:
org.springframework.messaging.MessageHandlingException: Failed to publish to MQTT in the [bean 'mqttOutbound'; defined in: 'class path resource [it/almaviva/mqtt/Configuration/Producer/Producer.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b95e48b']; nested exception is Connessione già in corso (32110)
Это класс Consumer, который читает и пишет на Producer ..
@Configuration
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
@Autowired MqttPahoClientFactory mqttClientFactory;
@Autowired
Producer producer;
@Autowired
private ApplicationContext context;
//@Autowired
//static Producer.ProducerGateway gw;
@Bean
public MessageChannel topicChannel(){
return new DirectChannel();
}
@Bean
public MessageProducer mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
Parameters.MQTT_CLIENT_ID, mqttClientFactory, Parameters.TOPICS[0]);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(topicChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "topicChannel")
public MessageHandler handler() {
return message -> {
OduModel model = JsonUtility.convertToOduModel(String.valueOf(message.getPayload()));
logger.info(model.toString());
boolean condition = DataProcessor.processOduModel(model);
logger.info(String.valueOf(condition));
//ApplicationContext context = new SpringApplicationBuilder(Producer.class).context();
Producer.ProducerGateway gw = context.getBean(Producer.ProducerGateway.class);
if (condition){
gw.sendToMqtt("OK"!, Parameters.TOPICS[1]);
}
else{
gw.sendToMqtt("No OK", Parameters.TOPICS[1]);
}
};
}
}
Parameters.TOPICS [0] является topic1, где я читаю входящее сообщение. Parameters.TOPICS [1] - это "topic2", где я пишу "хорошо" или "не хорошо".
Правильно ли, в частности, использовать этот код в MessageHandler ??
Producer.ProducerGateway gw = context.getBean(Producer.ProducerGateway.class);
if (condition){
gw.sendToMqtt(""OK", Parameters.TOPICS[1]);
}
else{
gw.sendToMqtt("not OK", Parameters.TOPICS[1]);
}
Я использую эта зависимость от пом. xml
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.2.3.RELEASE</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jackson-module-kotlin</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
</exclusion>
</exclusions>
</dependency>
Большое спасибо. Привет