Клиент Mqtt с Eclipse Paho - PullRequest
0 голосов
/ 08 мая 2018

Я использую MQTT Client с Eclipse Paho и у меня есть некоторые проблемы:

И издатель, и подписчик подключаются к брокеру с qos = 1 и setCleanSession = ложь.

Мой поток:

  1. Подключите подписчика и издателя к брокеру, все в порядке.
  2. Отключить подписчика (я принудительно прекращаю работу над моим проектом, в который входит подписчик), издатель продолжает публикацию сообщения.
  3. Переподключить подписчика -> он не может подключиться и выдать исключение: потерянное соединение.

Если я установил qos of Subscriber = 0, он не выбрасывает исключение, но клиент не получает сообщения, отправленные издателем, пока подписчик находится в автономном режиме, что мне не нужно

Может кто-нибудь помочь мне с этим?


Это мой код в подписчике

try {
            // Create an Mqtt client
            MqttAsyncClient mqttClient
                    = new MqttAsyncClient("tcp://" + swmConfig.getMqttApiLink(), "MeasureTransactionApi");
            // new MqttAsyncClient(serverURI, clientId, persistence)
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(swmConfig.getMqttUsername());
            connOpts.setPassword(swmConfig.getMqttPassword().toCharArray());
            connOpts.setCleanSession(false);

            // Connect to RabbitMQ Broker
            log.info("Connecting to RabbitMQ broker: " + swmConfig.getMqttApiLink());
            IMqttToken conToken = mqttClient.connect(connOpts);
            conToken.waitForCompletion(10000);
            if (!conToken.isComplete() || conToken.getException() != null) {
                log.info("Error connecting: " + conToken.getException());
                System.exit(-1);
            }
            log.info("Connected");

            // Latch used for synchronizing b/w threads
            final CountDownLatch latch = new CountDownLatch(1);

            // Callback - Anonymous inner-class for receiving messages
            mqttClient.setCallback(new MqttCallback() {

                public void messageArrived(String topic, MqttMessage message) {
                    String time = new Timestamp(System.currentTimeMillis()).toString();
                    log.info("\nReceived a Message from RabbitMQ Broker" + "\n\tTime:    " + time
                            + "\n\tTopic:   " + topic + "\n\tMessage: "
                            + new String(message.getPayload()) + "\n\tQoS:     "
                            + message.getQos() + "\n");

                    handleMQTTMessageService.handleMessageArrived(message);
                }

                public void connectionLost(Throwable cause) {
                    log.info("Connection to RabbitMQ broker lost!" + cause.getMessage());
                    latch.countDown();
                }

                public void deliveryComplete(IMqttDeliveryToken token) {
                    log.info("deliveryComplete");
                }

            });

            // Subscribe client to the topic filter with QoS level of 1
            log.info("Subscribing client to topic: " + topic);
            IMqttToken subToken = mqttClient.subscribe(topic, 1);
            subToken.waitForCompletion(10000);
            if (!subToken.isComplete() || subToken.getException() != null) {
                log.info("Error subscribing: " + subToken.getException());
                System.exit(-1);
            }
        } catch (MqttException me) {
            log.error("Error:", me);
        }

1 Ответ

0 голосов
/ 08 мая 2018

QOS не зависит от издателей и подписчиков.

Чтобы обеспечить доставку подписывающемуся клиенту, вам нужно подписаться с более чем QOS 0.

То, что происходит с подписками QOS 0, зависит от посредника, по умолчанию большинство не будет помещать в очередь сообщения для подписок QOS 0, но mosquitto может быть принудительно установлен с флагом конфигурации queue_qos0_messages

...