Reactor-netty TCPClient не может получать ответы - PullRequest
0 голосов
/ 29 октября 2018

Я пытаюсь сделать предпроектный опыт работы с последней версией реактора-нетто, в которой отсутствует документация; Я использую версию 0.8.0.M3.

Я разработал простое приложение для весенней загрузки с этим tcp-сервером, которое запускается правильно и, кажется, работает:

@PostConstruct
public void startServer() throws InterruptedException {
    TcpServer.create().
              host("localhost").
              port(1235).
              handle((in, out) -> {

                    Flux<String> fluxString = in.receive().asString().log().
                        map(text -> {
                            return "Hi server have received "+text;});  
                        return out.sendString(fluxString).then();

              } 
              ).
              wiretap().bindNow();
}

Если я пытаюсь протестировать с помощью клиента, взаимодействие кажется правильным, но я не могу получить ответ:

int counter = 10;
    CountDownLatch latch = new CountDownLatch(counter);
    Flux<String> input = Flux.range(0, counter).map(i->""+i);

    TcpClient.create().
      host("localhost").
      port(1235).
      handle((in, out) -> {

          in.receive().subscribe(receiv -> {System.out.println(receiv);latch.countDown();});
                return out.sendString(input).neverComplete();

      } 
      ).
      wiretap().connectNow();
    System.out.println("waiting closure");
    boolean result = latch.await(5, TimeUnit.SECONDS);

при просмотре журнала прослушивания кажется, что клиенты отправляют каждое int как строку отдельно, а сервер получает только одну агрегированную строку "0123456789" и отправляет только один ответ. Клиент ничего не получает, и защелка не уменьшается ни на 1, а остается на 10 (я ожидаю, что будет получен хотя бы один агрегированный ответ).

Может ли кто-нибудь объяснить, что не так с клиентом и как получать каждое целое число отдельно от сервера?

Thx G

1 Ответ

0 голосов
/ 14 декабря 2018

Есть несколько проблем, которые вам, возможно, придется исправить. И я хотел бы сказать, что это немного сложно для изучения.

Для сервера:

    TcpServer.create()
            .host("localhost")
            .port(1235)
            .doOnConnection(c ->
                    //The origin input are 0,1,2,3,4,5,6,7,8,9.
                    //So you need a decoder split every 1 byte as a ByteBuf.
                    c.addHandler(
                            "1ByteStringDecoder",
                            new ByteToMessageDecoder() {
                                @Override
                                protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
                                    out.add(in.readBytes(1));
                                }
                            }
                    )
            )
            .handle((in, out) -> {
                        Flux<String> fluxString = in.receive()
                                .asString()
                                .log()
                                .map(text -> {
                                    return "Hi server have received " + text;
                                });
                        //Since the output is quite small, you need flush it
                        return out.options(o -> o.flushOnEach())
                                .sendString(fluxString)
                                .neverComplete();

                    }
            )
            .wiretap()
            .bindNow();

Для клиента:

    int counter = 10;
    CountDownLatch latch = new CountDownLatch(counter);
    startServer();
    Flux<String> input = Flux.range(0, counter)
            .map(i -> "" + i);

    TcpClient.create()
            .host("localhost")
            .port(1235)
            .doOnConnected(c ->
                    c.addHandler(
                            //The covert input are "Hi server have received " + (0,1,2,3,4,5,6,7,8,9).
                            //So you need a decoder split every 25 byte as a ByteBuf.
                            "25ByteStringDecoder",
                            new ByteToMessageDecoder() {
                                @Override
                                protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
                                    out.add(in.readBytes(25));
                                }
                            }
                    )
            )
            .handle((in, out) -> {
                        in.receive()
                                .asString()//You need convert ByteBuf to String.
                                .subscribe(receiv -> {
                                    System.out.println(receiv);
                                    latch.countDown();
                                });

                        out.options(o -> o.flushOnEach())
                                .sendString(input)
                                .then()
                                .subscribe(); //You need to ask your client to send the data by subscribe
                        return Mono.never();

                    }
            )
            .wiretap()
            .connectNow();
    System.out.println("waiting closure");
    boolean result = latch.await(5, TimeUnit.SECONDS);
...