Как получить моно сообщение после отправки другого сообщения - PullRequest
0 голосов
/ 15 апреля 2019

Как отправить сообщение и получить сообщение

Способ

Mono<ByteBuf> send(ByteBuf buf){
    // how to send the buf then receive a msg
}

Я пытаюсь реализовать этот метод, отправив сообщение с исходящего соединения и получив сообщение с входящего, а затем вернув сообщение Моно. Но я могу получить сообщение только в методе (Publisher). Кажется, он не может вернуться к данным Mono

Я пробовал это.

// the connecttion has been initialized before entering this method.

        Mono.just(buf)
                .doOnNext(data -> connection.outbound().sendObject(data).then().subscribe())
                .then(connection
                        .inbound()
                        .receiveObject()
                        .single()
                        .map(RpcDataPackage.class::cast)
                        .map(RpcDataPackage::getData)
                        .map(data -> {
                            try {
                                return resCodec.decode(data);
                            } catch (IOException e) {
                                throw new RpcRequestException(e);
                            }
                        })
                );

но будет блокироваться до истечения времени ожидания соединения

И я попробовал другой код. Я добавляю handle метод и помещаю ответ на карту. Тогда я могу получить Mono.fromSupply() с разрывом цикла while в map.get(key) != null.

Это заблокирует поток.

                .handle(((nettyInbound, nettyOutbound) -> nettyInbound
                        .receiveObject()
                        .map(RpcDataPackage.class::cast)
                        .doOnNext(pkg -> {
                            String responseKey = "a key"

                            responseMap.put(responseKey, pkg);
                        })
                        .then()))

Ответы [ 3 ]

0 голосов
/ 16 апреля 2019

Я прочитал Mono Javadoc и нашел MonoSink.

Mono.create(monoSink -> {
  // some call
})

при получении входящего ответа объекта просто сделайте sink.success()

0 голосов
/ 23 апреля 2019

Вы должны использовать комбинацию NettyOutbound :: then для прослушивания завершения записи и Mono :: then для чтения вашего NettyInboud после записи.

  Mono<String> resposeMono = TcpClient.create()
            .connect()
            .flatMap(connection -> connection.outbound().sendString(Mono.just("Hello!"))
                    .then()
                    .then(connection.inbound().receive().aggregate().asString())
                    .doOnTerminate(connection::dispose));

Это напишет «Hello!»к выводу прочитайте все байты из ввода как строку и затем удалите соединение.

0 голосов
/ 15 апреля 2019

Вы не указываете, что вы ожидаете.См. Пример ниже, он отправляет некоторые данные и затем получает то, что вернул сервер.

    @Test
    public void test() {
        Connection connection =
                TcpClient.create()
                         .wiretap(true)
                         .host("example.com")
                         .port(80)
                         .connect()
                         .block();

        assertNotNull(connection);

        connection.outbound()
                  .sendObject(Unpooled.wrappedBuffer("test".getBytes()))
                  .then(connection.inbound()
                                  .receiveObject()
                                  .last()
                                  .doOnNext(System.out::println)
                                  .then())
                  .then()
                  .block();
    }
...