Я использую MQTT Client с Eclipse Paho и у меня есть некоторые проблемы:
И издатель, и подписчик подключаются к брокеру с qos = 1 и setCleanSession =
ложь.
Мой поток:
- Подключите подписчика и издателя к брокеру, все в порядке.
- Отключить подписчика (я принудительно прекращаю работу над моим проектом, в который входит подписчик), издатель продолжает публикацию сообщения.
- Переподключить подписчика -> он не может подключиться и выдать исключение: потерянное соединение.
Если я установил qos of Subscriber = 0, он не выбрасывает исключение, но клиент не получает сообщения, отправленные издателем, пока подписчик находится в автономном режиме, что мне не нужно
Может кто-нибудь помочь мне с этим?
Это мой код в подписчике
try {
// Create an Mqtt client
MqttAsyncClient mqttClient
= new MqttAsyncClient("tcp://" + swmConfig.getMqttApiLink(), "MeasureTransactionApi");
// new MqttAsyncClient(serverURI, clientId, persistence)
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(swmConfig.getMqttUsername());
connOpts.setPassword(swmConfig.getMqttPassword().toCharArray());
connOpts.setCleanSession(false);
// Connect to RabbitMQ Broker
log.info("Connecting to RabbitMQ broker: " + swmConfig.getMqttApiLink());
IMqttToken conToken = mqttClient.connect(connOpts);
conToken.waitForCompletion(10000);
if (!conToken.isComplete() || conToken.getException() != null) {
log.info("Error connecting: " + conToken.getException());
System.exit(-1);
}
log.info("Connected");
// Latch used for synchronizing b/w threads
final CountDownLatch latch = new CountDownLatch(1);
// Callback - Anonymous inner-class for receiving messages
mqttClient.setCallback(new MqttCallback() {
public void messageArrived(String topic, MqttMessage message) {
String time = new Timestamp(System.currentTimeMillis()).toString();
log.info("\nReceived a Message from RabbitMQ Broker" + "\n\tTime: " + time
+ "\n\tTopic: " + topic + "\n\tMessage: "
+ new String(message.getPayload()) + "\n\tQoS: "
+ message.getQos() + "\n");
handleMQTTMessageService.handleMessageArrived(message);
}
public void connectionLost(Throwable cause) {
log.info("Connection to RabbitMQ broker lost!" + cause.getMessage());
latch.countDown();
}
public void deliveryComplete(IMqttDeliveryToken token) {
log.info("deliveryComplete");
}
});
// Subscribe client to the topic filter with QoS level of 1
log.info("Subscribing client to topic: " + topic);
IMqttToken subToken = mqttClient.subscribe(topic, 1);
subToken.waitForCompletion(10000);
if (!subToken.isComplete() || subToken.getException() != null) {
log.info("Error subscribing: " + subToken.getException());
System.exit(-1);
}
} catch (MqttException me) {
log.error("Error:", me);
}