Отправка асин c сообщения от микронавта ServerWebSocket - PullRequest
1 голос
/ 19 февраля 2020

Я пытаюсь реализовать простой сервер WebSocket с использованием Micronaut (и Groovy). Я новичок в Micronaut, поэтому изучаю его как I go, и с ним, кажется, требуется Rx Java. Так что изучая это также.

Я могу заставить работать пример basi c:

@CompileStatic
@ServerWebSocket("/ws")
class TimeseriesWebSocket {
    @OnOpen
    public Publisher<String> onOpen(WebSocketSession session) {
        println("opening connection")
        return session.send("Hello")
    }

    @OnMessage
    public Publisher<String> onMessage(String message, WebSocketSession session) {
        println("message received")
        return session.send("Thanks for the message")
    }

    @OnClose
    public Publisher<String> onClose(WebSocketSession session) {
        println("Closing")
        //this seems not to be delivered, I guess due to session being closed already
        return session.send("OK")
    }
}

Таким образом, это выводит на печать все сообщения, которые я туда помещаю, и отлично работает при подключении клиента. Клиент также видит сообщения «Привет» и «Спасибо за сообщение», которые возвращаются с помощью session.send (..).

Теперь моя проблема в том, что я пытаюсь отправить сообщение вне этих методов. Например:

@CompileStatic
@ServerWebSocket("/ws")
class MyWebSocket {
    @OnOpen
    public Publisher<String> onOpen(WebSocketSession session) {
        println("opening connection")
        startPing()
        return session.send("Hello")
    }

 //...(same as above)

    public void startPing(WebSocketSession session) {
        PingPing ping = new PingPing(session)
        ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
        exec.scheduleAtFixedRate(ping, 0, 1, TimeUnit.SECONDS);
    }
}

class PingPing {
    WebSocketSession session

    public PingPing(WebSocketSession session) {
        this.session = session
    }

    @Override
    void run() {
        println("pinging..")
        try {
            session.send("Ping...")
        } catch (Exception e) {
            e.printStackTrace()
        }
    }
}

Это выполняется, но на стороне клиента ничего не появляется. Теперь, если я изменяю session.send () на session.sendSyn c (), он работает нормально. Пинг доставлен.

Подпись send () на самом деле

default <T> Publisher<T> send(T message)

Сначала я подумал, что должен предоставить издателю какой-то другой источник для его отправки. Но я думаю, что это не так. Я понял, что это своего рода объект Future, поэтому, если я сам на него подпишусь так:

def publisher = session.send("Ping...")
publisher.subscribe(new Subscriber<GString>() {
    @Override
    void onSubscribe(Subscription s) {
        println("subscription")
    }

    @Override
    void onNext(GString gString) {
        println("next")
    }

    @Override
    void onError(Throwable t) {
        println("error")
        t.printStackTrace()
    }

    @Override
    void onComplete() {
        println("complete")
    }
})
println("publisher: ${publisher}")

Запуск вышеуказанного фрагмента кода (с подпиской), я предполагаю, что он вызывает session.send () в текущем потоке и возвращает результат. Но где мне на самом деле это назвать? На какой нить? Я посмотрел на планировщики Rx Java, но не мог понять, откуда его вызывать.

Кроме того, результат выполнения вышеуказанного фактически доставляет сообщение клиенту, но также выдает ошибку:

error
io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
    at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:438)
    at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:406)
    at io.micronaut.http.netty.websocket.NettyRxWebSocketSession.lambda$null$2(NettyRxWebSocketSession.java:191)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)

Что такое обратное давление и значения / запросы, на которые это ссылается, и как на самом деле я должен обрабатывать асинхронную c отправку сообщения? Я ожидаю, что он просто отправит один элемент, который я пытаюсь отправить ..

Документация Micronaut API упоминает о следующем javax.websocket API, но javax.websocket asyn c API , кажется, имеет больше смысла, просто предоставляя будущее для прослушивания.

Итак, короткий вопрос: как использовать API-интерфейс Micronaut Websocket для отправки сообщения в асинхронном режиме c за пределами функций, предоставляемых Micronaut? Или я все делаю не так?

Возможно, я делаю некоторые общие неверные предположения об этом API, но не могу понять это из документов и не могу найти пример.

1 Ответ

1 голос
/ 26 февраля 2020

У меня была такая же проблема с micronaut v1.3.2.

Мне удалось заставить его работать с sendAsync / sendSyn c вместо send.

Я посмотрел на реализацию NettyRxWebSocketSession и похоже, что send реализована несколько иначе, чем sendAsyn c. Не ясно, если это из-за конфигурации или просто проблема с реализацией отправки.

Надеюсь, это поможет.

...