Как переслать входящие данные через REST в поток SSE в Quarkus - PullRequest
2 голосов
/ 23 апреля 2020

В моих настройках я хочу пересылать определенные изменения статуса через канал SSE (события, отправленные сервером). Изменения состояния инициируются путем вызова конечной точки REST. Итак, мне нужно переслать входящее изменение статуса в поток SSE.

Каков наилучший / самый простой способ выполнить sh это в Quarkus.

Одно решение, о котором я могу подумать, это использовать EventBus (https://quarkus.io/guides/reactive-messaging). Конечная точка SSE будет подписываться на изменения статуса и отправлять их по каналу SSE. Конечная точка изменения статуса публикует соответствующие события.

Это жизнеспособное решение? Есть ли другие (более простые) решения? Нужно ли мне использовать реактивные вещи в любом случае, чтобы выполнить sh это?

Любая помощь очень ценится!

Ответы [ 2 ]

1 голос
/ 23 апреля 2020

Дмитрий, спасибо, что указал мне правильное направление. Я выбрал Мятеж в связи с Kotlin. Мой код теперь выглядит так:

data class DeviceStatus(var status: Status = Status.OFFLINE) {
    enum class Status {OFFLINE, CONNECTED, ANALYZING, MAINTENANCE}
}

@ApplicationScoped
class DeviceStatusService {
    var deviceStatusProcessor: PublishProcessor<DeviceStatus> = PublishProcessor.create()
    var deviceStatusQueue: Flowable<DeviceStatus> = Flowable.fromPublisher(deviceStatusProcessor)

    fun pushDeviceStatus(deviceStatus: DeviceStatus) {
        deviceStatusProcessor.onNext(deviceStatus)
    }

    fun getStream(): Multi<DeviceStatus> {
        return Multi.createFrom().publisher(deviceStatusQueue)
    }
}

@Path("/deviceStatus")
class DeviceStatusResource {
    private val LOGGER: Logger = Logger.getLogger("DeviceStatusResource")

    @Inject
    @field: Default
    lateinit var deviceStatusService: DeviceStatusService

    @POST
    @Consumes(MediaType.APPLICATION_JSON)
    fun status(status: DeviceStatus): Response {
        LOGGER.info("POST /deviceStatus " + status.status);
        deviceStatusService.pushDeviceStatus(status)
        return Response.ok().build();
    }

    @GET
    @Path("/eventStream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType(MediaType.APPLICATION_JSON)
    fun stream(): Multi<DeviceStatus>? {
        return deviceStatusService.getStream()
    }
}

В качестве минимальной настройки служба может напрямую использовать deviceStatusProcessor в качестве издателя. Однако Flowable добавляет буферизацию. Комментарии к реализации приветствуются.

1 голос
/ 23 апреля 2020

Самый простой способ - использовать rx java в качестве поставщика потоков. Для начала нужно добавить зависимость rx java. Он может go либо из реактивных зависимостей в quarkus, таких как kafka, либо напрямую (если вам не нужны потоковые библиотеки):

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.2.19</version>
        </dependency>

Вот пример того, как отправить случайное двойное значение каждую секунду:

    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType("text/plain")
    public Publisher<Double> stream() {
        return Flowable.interval(1, TimeUnit.SECONDS).map(tick -> new Random().nextDouble());
    }

Мы создаем новый Flowable, который будет срабатывать каждую секунду, и на каждом тике мы генерируем следующий случайный дубль. Изучите любые другие варианты того, как вы можете создать Flowable, например Flowable.fromFuture(), чтобы адаптировать его для вашего указанного c кода logi c.

PS-код выше будет генерировать новый Flowable каждый раз, когда вы запрашиваете эту конечную точку, Я сделал это для экономии места, в вашем случае я предполагаю, что у вас будет один источник событий, который вы можете создать один раз и использовать один и тот же экземпляр каждый раз, когда конечная точка запрашивает

...