реактивная публикация данных WebSocketClient - PullRequest
0 голосов
/ 29 марта 2019

Я пытаюсь прочитать данные из веб-сокета, используя ReactorNettyWebSocketClient, но я просто не могу подключиться к его API. Проблема в том, что все данные, которые я получаю, доступны во внутренней части лямбда-стиля websockethandler (# 1), но я хочу сделать их доступными для подписчиков после .subscribe на client.execute (..) (# 2)

WebSocketClient client = new ReactorNettyWebSocketClient();
        client.execute(
                URI.create(URL),
                session -> session.send(
                        Mono.just(session.textMessage(pairRqStr)))
                        .thenMany(session.receive()
                                .map(WebSocketMessage::getPayloadAsText)
                                .map(this::toResp)
                                .onErrorContinue((throwable, o) -> throwable.getMessage())
                        )
                        .log() // #1
                        .then()
        )
                .log()
                .subscribe(System.out::println); // #2

немного потерян и новичок в этом, так что, пожалуйста, наведите меня.

Ответы [ 2 ]

0 голосов
/ 01 апреля 2019

это то, что я сделал в конце концов, но мне все еще не очень нравится:

    private Flux<WebSocketMessage> requestData(String req) {
        WebSocketClient client = new ReactorNettyWebSocketClient();
        return ConnectableFlux.create(sub -> {
            client.execute(
                    URI.create(URL),
                    session -> session.send(
                            Mono.just(session.textMessage(req)))
                            .thenMany(session.receive().doOnNext(sub::next))
                            .then()
            )
                    .log()
                    .subscribe();
        });
    }
0 голосов
/ 30 марта 2019

Для отправки или получения сообщений сначала необходимо подключиться к каналу.Вот почему client.execute возвращает Mono<Void>, что означает, что он не возвращает никаких данных, он просто сигнализирует о завершении или сбое хэндскахе.Если он вернется, например.Flux<WebSocketMessage>, как бы вы узнали, успешно ли завершился рукопожатие?

Если вы хотите получить доступ к каналу вне лямбды, вы не можете сделать это, реализовав метод обработчика:

        Consumer<WebSocketMessage> printingConsumer = webSocketMessage -> System.out.println(webSocketMessage.getPayloadAsText());

        client.execute(URI.create(URL), session -> handle(session, printingConsumer));
    }


    public Mono<Void> handle(WebSocketSession session, Consumer<WebSocketMessage> consumer) {
        return session.receive()
                .doOnNext(consumer::accept)
                .then();
    }
...