Как получать несколько сообщений в HiveMQ Client?(MQTT) - PullRequest
0 голосов
/ 11 июня 2019

Я пытаюсь выяснить, как получать несколько сообщений в HiveMQ Client, используя один и тот же блок try catch, даже используя разные клиенты. Я последовал этому примеру:


Приведенный выше пример отлично работает с одним клиентом и одной публикацией и подпиской, но я хотел бы выполнить несколько из этих действий в одном блоке try catch, если это возможно.

package com.main;

import java.util.UUID;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient.Mqtt5Publishes;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import java.util.logging.Logger;
import java.util.NoSuchElementException;

import java.util.logging.Level;
import java.util.concurrent.TimeUnit;

public class Main {

    private static final Logger LOGGER = Logger.getLogger(Main.class.getName());  // Creates a logger instance 

    public static void main(String[] args) {

            Mqtt5BlockingClient client1 = Mqtt5Client.builder()
            .identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between 
            .serverHost("localhost")  // the host name or IP address of the MQTT server. Kept it for testing. localhost is default if not specified.
            .serverPort(1883)  // specifies the port of the server
            .buildBlocking();  // creates the client builder

            client1.connect();  // connects the client
            System.out.println("Client1 Connected");

            Mqtt5BlockingClient client2 = Mqtt5Client.builder()
                    .identifier(UUID.randomUUID().toString()) // the unique identifier of the MQTT client. The ID is randomly generated between 
                    .serverHost("localhost")  // the host name or IP address of the MQTT server. Kept it for testing. localhost is default if not specified.
                    .serverPort(1883)  // specifies the port of the server
                    .buildBlocking();  // creates the client builder

            client2.connect();  // connects the client
            System.out.println("Client2 Connected");            

            String testmessage = "How is it going!";
            byte[] messagebytesend = testmessage.getBytes();   // stores a message as a byte array to be used in the payload 

    try {  

        Mqtt5Publishes publishes = client1.publishes(MqttGlobalPublishFilter.ALL);  // creates a "publishes" instance thats used to queue incoming messages
                                                                                    // .ALL - filters all incoming Publish messages 
            client1.subscribeWith()  // creates a subscription 
            .topicFilter("test/something1/topic")  // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
            .qos(MqttQos.AT_LEAST_ONCE)  // Sets the QoS to 2 (At least once) 
            System.out.println("The client1 has subscribed");

            client1.publishWith()  // publishes the message to the subscribed topic 
            .topic("test/something1/topic")   // publishes to the specified topic
            .payload(messagebytesend)  // the contents of the message 
            System.out.println("The client1 has published");

         Mqtt5Publish receivedMessage = publishes.receive(5,TimeUnit.SECONDS).get(); // receives the message using the "publishes" instance waiting up to 5 seconds                                                                         // .get() returns the object if available or throws a NoSuchElementException 

         byte[] tempdata = receivedMessage.getPayloadAsBytes();    // converts the "Optional" type message to a byte array 
         String getdata = new String(tempdata); // converts the byte array to a String 

        client2.subscribeWith()  // creates a subscription 
           .topicFilter("test/something2/topic")  // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
           .qos(MqttQos.AT_LEAST_ONCE)  // Sets the QoS to 2 (At least once) 
           System.out.println("The client2 has subscribed");

         client2.publishWith()  // publishes the message to the subscribed topic 
            .topic("test/something2/topic")   // publishes to the specified topic
            .payload("The second message :P".getBytes())  // the contents of the message 
            System.out.println("The client2 has published");  

            // VV   Why isn't the publish instance below receiving the second message? Do i need another try catch?  VV

         receivedMessage = publishes.receive(5,TimeUnit.SECONDS).get(); // receives the message using the "publishes" instance waiting up to 5 seconds                                                                          // .get() returns the object if available or throws a NoSuchElementException 

         byte[] tempdata2 = receivedMessage.getPayloadAsBytes();    // converts the "Optional" type message to a byte array 
         getdata = new String(tempdata2); // converts the byte array to a String 


    catch (InterruptedException e) {    // Catches interruptions in the thread 
        LOGGER.log(Level.SEVERE, "The thread was interrupted while waiting for a message to be received", e);

    catch (NoSuchElementException e){
        System.out.println("There are no received messages");   // Handles when a publish instance has no messages 

    System.out.println("Client1 Disconnected");

    System.out.println("Client2 Disconnected");


Вывод, который я получаю:

Клиент1 подключен

Клиент2 подключен

Клиент1 подписался

Клиент1 опубликовал

Как дела!

Клиент2 подписался

Клиент2 опубликовал

Нет полученных сообщений

Клиент1 отключен

Клиент2 отключен

Требуемый вывод:

Клиент1 подключен

Клиент2 подключен

Клиент1 подписался

Клиент1 опубликовал

Как дела!

Клиент2 подписался

Клиент2 опубликовал

Второе сообщение: P

Клиент1 отключен

Клиент2 отключен

1 Ответ

2 голосов
/ 11 июня 2019

Я запустил ваш код и нашел этот журнал WARN:

2019-06-11 20:32:22,774 WARN  - No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=test/something2/topic, payload=21byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=51, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}.

Кажется, вы забыли установить фильтр публикации для вашего второго клиента.И действительно, в коде, где вы ожидаете второго сообщения (для client2), вы проверяете поток сообщений client1.Поэтому вам просто нужно добавить фильтр публикации для client2:

Mqtt5Publishes publishesClient2 = client2.publishes(MqttGlobalPublishFilter.ALL);

, а затем дождаться сообщения для client2:

// VV   Why isn't the publish instance below receiving the second message? Do i need another try catch?  VV

     receivedMessage = publishesClient2.receive(5,TimeUnit.SECONDS).get(); 



Client1 Connected
Client2 Connected
The client1 has subscribed
The client1 has published

How is it going!
The client2 has subscribed
The client2 has published

2019-06-11 20:46:36,537 WARN  - No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=test/something2/topic, payload=21byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=51, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}.
There are no received messages
Client1 Disconnected
Client2 Disconnected


Client1 Connected
Client2 Connected
The client1 has subscribed
The client1 has published

How is it going!
The client2 has subscribed
The client2 has published

The second message :P
Client1 Disconnected
Client2 Disconnected

Редактировать: Я надеюсь, что этоРешение, которое вы ищете, так как желаемый результат не совпадает с тем, который я получаю с исправлением.Поскольку NoSuchElementException больше не выбрасывается / ловится.Поэтому после пропущенного второго сообщения «Нет принятых сообщений».

Редактировать в ответ на комментарий: Фрагмент для сбора сообщений публикации для client2 с асинхронной разновидностью (Просто замените код в блоке try накод ниже):

// The list where we put our received publish messages
            final List<Mqtt5Publish> incomingMessagesClient2 = new LinkedList<>();

            // With the async flavour we can add a consumer for the incoming publish messages
            client2.toAsync().publishes(MqttGlobalPublishFilter.ALL, mqtt5Publish ->

            client1.publishes(MqttGlobalPublishFilter.ALL);  // creates a "publishes" instance thats used to queue incoming messages

            client2.subscribeWith()  // creates a subscription
                    .topicFilter("test/something1/topic")  // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
                    .qos(MqttQos.AT_LEAST_ONCE)  // Sets the QoS to 2 (At least once)
            System.out.println("The client2 has subscribed");

            client1.publishWith()  // publishes the message to the subscribed topic
                    .topic("test/something1/topic")   // publishes to the specified topic
                    .payload(messagebytesend)  // the contents of the message
            System.out.println("The client1 has published");

            client1.publishWith()  // publishes the message to the subscribed topic
                    .topic("test/something1/topic")   // publishes to the specified topic
                    .payload("The second message :P".getBytes())  // the contents of the message
            System.out.println("The client1 has published");


            incomingMessagesClient2.forEach(mqtt5Publish -> System.out.println(new String(mqtt5Publish.getPayloadAsBytes())));

С уважением,

Майкл из команды HiveMQ
