Связь Websocket против Netty Environment - PullRequest
0 голосов
/ 19 марта 2020

Мне нужно реализовать связь между двумя Java средами. Получателем является реактивное приложение SpringBoot, и фрагмент для обработки связи выглядит следующим образом (я пропущу настройку bean-компонентов)

@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
    return webSocketSession.send(webSocketSession.receive() // <- Step 0
        .map(message -> {
            log.info("Step 1");
            return message.getPayloadAsText();
        })
        .map(message -> {
            log.info("Step 2");
            return webSocketSession.textMessage(this.receiveMessage(message));
        }));
}

Клиентская часть реализована с использованием Http API из java 11

WebSocket webSocket = HttpClient
    .newBuilder().executor(executor).build()
    .newWebSocketBuilder()
    .buildAsync(URI.create(url), new WebSocket.Listener() {
        @Override
        public void onOpen(WebSocket webSocket) {
            log.info("onOpen using subprotocol " + webSocket.getSubprotocol());
            WebSocket.Listener.super.onOpen(webSocket);
        }

        @Override
        public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
            log.info("onText received with data " + data);
            return WebSocket.Listener.super.onText(webSocket, data, last);
        }

        @Override
        public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
            log.info("Closed with status " + statusCode + ", reason: " + reason);
            return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
        }

        @Override
        public void onError(WebSocket webSocket, Throwable error) {
            log.error("Error: " + error.getMessage());
            WebSocket.Listener.super.onError(webSocket, error);
        }

    }).join();


webSocket.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), true);
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "ok").thenRun(() -> log.info("Sent close"));

Используя отладку, я могу заметить, что после завершения join() и возврата экземпляра WebSocket выполняется метод на шаге 0 получателя и возвращается экземпляр Mono<Void> .

Но проблема в том, что даже если я отправляю какой-то текст, шаги 1 и 2 никогда не выполняются!

Если я пытаюсь выполнить обратный обмен данными (отправка чего-либо из приложения SpringBoot в приложение Sender ) сообщения получены.

Наконец, это журнал выполнения обратного вызова onClose после оператора sendClose.

Closed with status 1002, reason: Server internal error

1 Ответ

0 голосов
/ 19 марта 2020

РЕШЕНИЕ

Поскольку метод buildAsync возвращает экземпляр CompletableFuture<WebSocket>, нам нужна цепочка отправки сообщений перед тем, как flu sh очередь сообщений, используя join()

Вот решение

WebSocket webSocket = HttpClient
    .newBuilder().executor(executor).build()
    .newWebSocketBuilder()
    .buildAsync(URI.create(url), new WebSocket.Listener() {
        @Override
        public void onOpen(WebSocket webSocket) {
            log.info("onOpen using subprotocol " + webSocket.getSubprotocol());
            WebSocket.Listener.super.onOpen(webSocket);
        }

        @Override
        public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
            log.info("onText received with data " + data);
            return WebSocket.Listener.super.onText(webSocket, data, last);
        }

        @Override
        public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
            log.info("Closed with status " + statusCode + ", reason: " + reason);
            return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
        }

        @Override
        public void onError(WebSocket webSocket, Throwable error) {
            log.error("Error: " + error.getMessage());
            WebSocket.Listener.super.onError(webSocket, error);
        }

    })
    .thenCompose(ws -> ws.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), false))
    .thenCompose(ws -> ws.sendText(toJSON(List.of("Lorem", "Ipsum", "dolor", "sit", "amet")), true))
    .thenCompose(ws -> webSocket.sendClose(WebSocket.NORMAL_CLOSURE, ""))
    .join();

...