Я пытаюсь реализовать простой сервер WebSocket с использованием Micronaut (и Groovy). Я новичок в Micronaut, поэтому изучаю его как I go, и с ним, кажется, требуется Rx Java. Так что изучая это также.
Я могу заставить работать пример basi c:
@CompileStatic
@ServerWebSocket("/ws")
class TimeseriesWebSocket {
@OnOpen
public Publisher<String> onOpen(WebSocketSession session) {
println("opening connection")
return session.send("Hello")
}
@OnMessage
public Publisher<String> onMessage(String message, WebSocketSession session) {
println("message received")
return session.send("Thanks for the message")
}
@OnClose
public Publisher<String> onClose(WebSocketSession session) {
println("Closing")
//this seems not to be delivered, I guess due to session being closed already
return session.send("OK")
}
}
Таким образом, это выводит на печать все сообщения, которые я туда помещаю, и отлично работает при подключении клиента. Клиент также видит сообщения «Привет» и «Спасибо за сообщение», которые возвращаются с помощью session.send (..).
Теперь моя проблема в том, что я пытаюсь отправить сообщение вне этих методов. Например:
@CompileStatic
@ServerWebSocket("/ws")
class MyWebSocket {
@OnOpen
public Publisher<String> onOpen(WebSocketSession session) {
println("opening connection")
startPing()
return session.send("Hello")
}
//...(same as above)
public void startPing(WebSocketSession session) {
PingPing ping = new PingPing(session)
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
exec.scheduleAtFixedRate(ping, 0, 1, TimeUnit.SECONDS);
}
}
class PingPing {
WebSocketSession session
public PingPing(WebSocketSession session) {
this.session = session
}
@Override
void run() {
println("pinging..")
try {
session.send("Ping...")
} catch (Exception e) {
e.printStackTrace()
}
}
}
Это выполняется, но на стороне клиента ничего не появляется. Теперь, если я изменяю session.send () на session.sendSyn c (), он работает нормально. Пинг доставлен.
Подпись send () на самом деле
default <T> Publisher<T> send(T message)
Сначала я подумал, что должен предоставить издателю какой-то другой источник для его отправки. Но я думаю, что это не так. Я понял, что это своего рода объект Future, поэтому, если я сам на него подпишусь так:
def publisher = session.send("Ping...")
publisher.subscribe(new Subscriber<GString>() {
@Override
void onSubscribe(Subscription s) {
println("subscription")
}
@Override
void onNext(GString gString) {
println("next")
}
@Override
void onError(Throwable t) {
println("error")
t.printStackTrace()
}
@Override
void onComplete() {
println("complete")
}
})
println("publisher: ${publisher}")
Запуск вышеуказанного фрагмента кода (с подпиской), я предполагаю, что он вызывает session.send () в текущем потоке и возвращает результат. Но где мне на самом деле это назвать? На какой нить? Я посмотрел на планировщики Rx Java, но не мог понять, откуда его вызывать.
Кроме того, результат выполнения вышеуказанного фактически доставляет сообщение клиенту, но также выдает ошибку:
error
io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:438)
at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:406)
at io.micronaut.http.netty.websocket.NettyRxWebSocketSession.lambda$null$2(NettyRxWebSocketSession.java:191)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
Что такое обратное давление и значения / запросы, на которые это ссылается, и как на самом деле я должен обрабатывать асинхронную c отправку сообщения? Я ожидаю, что он просто отправит один элемент, который я пытаюсь отправить ..
Документация Micronaut API упоминает о следующем javax.websocket API, но javax.websocket asyn c API , кажется, имеет больше смысла, просто предоставляя будущее для прослушивания.
Итак, короткий вопрос: как использовать API-интерфейс Micronaut Websocket для отправки сообщения в асинхронном режиме c за пределами функций, предоставляемых Micronaut? Или я все делаю не так?
Возможно, я делаю некоторые общие неверные предположения об этом API, но не могу понять это из документов и не могу найти пример.