Мы пытаемся опубликовать sh и подписаться на протокол MQTT, используя небольшие реактивные сообщения. Нам удалось опубликовать sh сообщение в указанной c теме / канале с помощью следующего простого кода:
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
@ApplicationScoped
public class Publish {
@Outgoing("pao")
public Multi<String> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> "A Message in here");
}
}
Что мы хотим сделать, так это вызывать всякий раз, когда нам нужен метод generate()
. с Dynami c topi c, где пользователь определит его. Это была наша проблема, но затем мы нашли эти классы из этого репозитория в github. Имя пакета io.smallrye.reactive.messaging.mqtt
Например, мы обнаружили, что существует класс, который сообщает, что он делает вызов publi sh брокеру MQTT (сервер Mosquitto включен).
Здесь, в этом утверждении SendingMqttMessage<String> message = new SendingMqttMessage<String>("myTopic","A message in here",0,false);
Мы получаем красное подчеркивание под SendingMqttMessage<String>
, говоря: 'SendingMqttMessage (java .lang.String, java .lang.String, io.netty.handler.code c .mqtt. MqttQoS, boolean) 'не публикуется c в' io.smallrye.reactive.messaging.mqtt.SendingMqttMessage '. Невозможно получить доступ из внешнего пакета
ОБНОВЛЕНИЕ (Publi sh выполнено) Наконец сделал запрос Publi sh к брокеру mqtt (серверу mosquitto) и все это с динамический c топи c настроенный пользователем. Как мы выяснили, предыдущий класс SendingMqttMessage
вообще не предполагалось использовать. И мы выяснили, что нам также нужен и эмиттер, чтобы фактически сделать запрос publi sh с динамическим c topi c.
@Inject
@Channel("panatha")
Emitter<String> emitter;
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response createUser(Device device) {
System.out.println("New Publish request: message->"+device.getMessage()+" & topic->"+device.getTopic());
emitter.send(MqttMessage.of(device.getTopic(), device.getMessage()));
return Response.ok().status(Response.Status.CREATED).build();
}
Теперь нам нужно узнать о подписке на топи c динамически.