Reactor Netty - как отправить с задержкой Flux - PullRequest
2 голосов
/ 06 марта 2019

В Reactor Netty при отправке данных на канал TCP через out.send(publisher) можно ожидать, что любой издатель будет работать. Однако если вместо простого немедленного Flux мы используем более сложный с задержанными элементами, то он перестает работать должным образом. Например, если мы возьмем этот эхо-сервер TCP hello world, он будет работать как положено:

import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

import java.time.Duration;

public class Reactor1 {
    public static void main(String[] args) throws Exception {
        DisposableServer server = TcpServer.create()
            .port(3344)
            .handle((in, out) -> in
                .receive()
                .asString()
                .flatMap(s ->
                    out.sendString(Flux.just(s.toUpperCase()))
                ))
            .bind()
            .block();
        server.channel().closeFuture().sync();
    }
}

Однако, если мы изменим out.sendString на

out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))

тогда мы ожидаем, что для каждого полученного элемента будет произведен вывод с задержкой в ​​одну секунду.

Однако сервер ведет себя так, что если он получает несколько элементов в течение интервала, он выдаст вывод только для первого элемента. Например, ниже мы вводим aa и bb в течение первой секунды, но только AA выводится в качестве выходных данных (через одну секунду):

$ nc localhost 3344
aa
bb
AA <after one second>

Затем, если мы позже введем дополнительную строку, мы получим вывод (через одну секунду), но из предыдущего ввода:

cc
BB <after one second>

Есть идеи, как заставить send() работать как положено с задержкой Flux?

Ответы [ 2 ]

1 голос
/ 06 марта 2019

Попробуйте использовать concatMap .Это работает:

DisposableServer server = TcpServer.create()
        .port(3344)
        .handle((in, out) -> in
                .receive()
                .asString()
                .concatMap(s ->
                        out.sendString(Flux.just(s.toUpperCase())
                                           .delayElements(Duration.ofSeconds(1)))
                ))
            .bind()
            .block();
server.channel().closeFuture().sync();

Задержка на входящий трафик

DisposableServer server = TcpServer.create()
        .port(3344)
        .handle((in, out) -> in
                .receive()
                .asString()
                .timestamp()
                .delayElements(Duration.ofSeconds(1))
                .concatMap(tuple2 ->
                        out.sendString(
                                Flux.just(tuple2.getT2().toUpperCase() +
                                        " " +
                                        (System.currentTimeMillis() - tuple2.getT1())
                                ))
                ))
        .bind()
        .block();
1 голос
/ 06 марта 2019

Я думаю, вы не должны воссоздавать издателя для out.sendString(...) Это работает:

DisposableServer server = TcpServer.create()
        .port(3344)
        .handle((in, out) -> out
                .options(NettyPipeline.SendOptions::flushOnEach)
                .sendString(in.receive()
                        .asString()
                        .map(String::toUpperCase)
                        .delayElements(Duration.ofSeconds(1))))
        .bind()
        .block();
server.channel().closeFuture().sync();
...