Publi sh / Подписка на MQTT с использованием динамического обмена сообщениями Smallrye. - PullRequest
1 голос
/ 13 июля 2020

Мы пытаемся опубликовать sh и подписаться на протокол MQTT, используя небольшие реактивные сообщения. Нам удалось опубликовать sh сообщение в указанной c теме / канале с помощью следующего простого кода:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;

@ApplicationScoped
public class Publish {
    
    @Outgoing("pao")
    public Multi<String> generate() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> "A Message in here");
    }
}

Что мы хотим сделать, так это вызывать всякий раз, когда нам нужен метод generate(). с Dynami c topi c, где пользователь определит его. Это была наша проблема, но затем мы нашли эти классы из этого репозитория в github. Имя пакета io.smallrye.reactive.messaging.mqtt

Например, мы обнаружили, что существует класс, который сообщает, что он делает вызов publi sh брокеру MQTT (сервер Mosquitto включен).

Здесь, в этом утверждении SendingMqttMessage<String> message = new SendingMqttMessage<String>("myTopic","A message in here",0,false); Мы получаем красное подчеркивание под SendingMqttMessage<String>, говоря: 'SendingMqttMessage (java .lang.String, java .lang.String, io.netty.handler.code c .mqtt. MqttQoS, boolean) 'не публикуется c в' io.smallrye.reactive.messaging.mqtt.SendingMqttMessage '. Невозможно получить доступ из внешнего пакета

ОБНОВЛЕНИЕ (Publi sh выполнено) Наконец сделал запрос Publi sh к брокеру mqtt (серверу mosquitto) и все это с динамический c топи c настроенный пользователем. Как мы выяснили, предыдущий класс SendingMqttMessage вообще не предполагалось использовать. И мы выяснили, что нам также нужен и эмиттер, чтобы фактически сделать запрос publi sh с динамическим c topi c.

    @Inject
    @Channel("panatha")
    Emitter<String> emitter;

    @POST
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response createUser(Device device) {
        System.out.println("New Publish request: message->"+device.getMessage()+" & topic->"+device.getTopic());
        emitter.send(MqttMessage.of(device.getTopic(), device.getMessage()));
        return Response.ok().status(Response.Status.CREATED).build();
    }

Теперь нам нужно узнать о подписке на топи c динамически.

...