В Reactor Netty при отправке данных на канал TCP через out.send(publisher)
можно ожидать, что любой издатель будет работать. Однако если вместо простого немедленного Flux
мы используем более сложный с задержанными элементами, то он перестает работать должным образом.
Например, если мы возьмем этот эхо-сервер TCP hello world, он будет работать как положено:
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.time.Duration;
public class Reactor1 {
public static void main(String[] args) throws Exception {
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.flatMap(s ->
out.sendString(Flux.just(s.toUpperCase()))
))
.bind()
.block();
server.channel().closeFuture().sync();
}
}
Однако, если мы изменим out.sendString
на
out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))
тогда мы ожидаем, что для каждого полученного элемента будет произведен вывод с задержкой в одну секунду.
Однако сервер ведет себя так, что если он получает несколько элементов в течение интервала, он выдаст вывод только для первого элемента. Например, ниже мы вводим aa
и bb
в течение первой секунды, но только AA
выводится в качестве выходных данных (через одну секунду):
$ nc localhost 3344
aa
bb
AA <after one second>
Затем, если мы позже введем дополнительную строку, мы получим вывод (через одну секунду), но из предыдущего ввода:
cc
BB <after one second>
Есть идеи, как заставить send()
работать как положено с задержкой Flux
?