Я запустил ваш код и нашел этот журнал WARN:
2019-06-11 20:32:22,774 WARN - No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=test/something2/topic, payload=21byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=51, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}.
Кажется, вы забыли установить фильтр публикации для вашего второго клиента.И действительно, в коде, где вы ожидаете второго сообщения (для client2), вы проверяете поток сообщений client1.Поэтому вам просто нужно добавить фильтр публикации для client2:
Mqtt5Publishes publishesClient2 = client2.publishes(MqttGlobalPublishFilter.ALL);
, а затем дождаться сообщения для client2:
// VV Why isn't the publish instance below receiving the second message? Do i need another try catch? VV
receivedMessage = publishesClient2.receive(5,TimeUnit.SECONDS).get();
Результаты:
До:
Client1 Connected
Client2 Connected
The client1 has subscribed
The client1 has published
How is it going!
The client2 has subscribed
The client2 has published
2019-06-11 20:46:36,537 WARN - No publish flow registered for MqttStatefulPublish{stateless=MqttPublish{topic=test/something2/topic, payload=21byte, qos=AT_LEAST_ONCE, retain=false}, packetIdentifier=51, dup=false, topicAlias=0, subscriptionIdentifiers=[1]}.
There are no received messages
Client1 Disconnected
Client2 Disconnected
После:
Client1 Connected
Client2 Connected
The client1 has subscribed
The client1 has published
How is it going!
The client2 has subscribed
The client2 has published
The second message :P
Client1 Disconnected
Client2 Disconnected
Редактировать: Я надеюсь, что этоРешение, которое вы ищете, так как желаемый результат не совпадает с тем, который я получаю с исправлением.Поскольку NoSuchElementException больше не выбрасывается / ловится.Поэтому после пропущенного второго сообщения «Нет принятых сообщений».
Редактировать в ответ на комментарий: Фрагмент для сбора сообщений публикации для client2 с асинхронной разновидностью (Просто замените код в блоке try накод ниже):
// The list where we put our received publish messages
final List<Mqtt5Publish> incomingMessagesClient2 = new LinkedList<>();
// With the async flavour we can add a consumer for the incoming publish messages
client2.toAsync().publishes(MqttGlobalPublishFilter.ALL, mqtt5Publish ->
incomingMessagesClient2.add(mqtt5Publish));
client1.publishes(MqttGlobalPublishFilter.ALL); // creates a "publishes" instance thats used to queue incoming messages
client2.subscribeWith() // creates a subscription
.topicFilter("test/something1/topic") // filters to receive messages only on this topic (# = Multilevel wild card, + = single level wild card)
.qos(MqttQos.AT_LEAST_ONCE) // Sets the QoS to 2 (At least once)
.send();
System.out.println("The client2 has subscribed");
client1.publishWith() // publishes the message to the subscribed topic
.topic("test/something1/topic") // publishes to the specified topic
.qos(MqttQos.AT_LEAST_ONCE)
.payload(messagebytesend) // the contents of the message
.send();
System.out.println("The client1 has published");
client1.publishWith() // publishes the message to the subscribed topic
.topic("test/something1/topic") // publishes to the specified topic
.qos(MqttQos.AT_LEAST_ONCE)
.payload("The second message :P".getBytes()) // the contents of the message
.send();
System.out.println("The client1 has published");
System.out.println();
TimeUnit.SECONDS.sleep(5);
incomingMessagesClient2.forEach(mqtt5Publish -> System.out.println(new String(mqtt5Publish.getPayloadAsBytes())));
С уважением,
Майкл из команды HiveMQ