Проблема в том, что вы комбинируете отправку и получение с thenMany
методом на Mono
и Flux
. thenMany
метод заставляет Flux
игнорировать элемент из этого потока и реагировать только на сигнал завершения.
Итак, ничего не произойдет, если вы не вызовете sink.complete()
. Но после вызова метода complete
дальнейшие события отправляться не будут, даже если будет запрошено.
Отправка и получение должны выполняться независимо.
Также вместо ConcurrentLinkedQueue
FluxProcessor
и FluxSink
могут быть использованы. EmitterProcessor
может излучать нескольким подписчикам, сохраняя противодавление для каждого из своих подписчиков. Когда у него нет подписчика, он все еще может принять несколько потоков данных до настраиваемых bufferSize
.
int bufferSize = 10;
FluxProcessor<String, String> processor =
EmitterProcessor.<String>create(bufferSize).serialize();
FluxSink<String> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
sink.next("1");
sink.next("2");
sink.next("3");
WebSocketClient webSocketClient = new ReactorNettyWebSocketClient();
webSocketClient.execute(new URI("wss://echo.websocket.org"),
session -> {
Flux<WebSocketMessage> out = Flux.just("INIT")
.concatWith(processor)
.map(session::textMessage);
session.send(out)
.subscribe(); //instead of thenMany
return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(s -> "Received: " + s)
.log()
.then();
})
.subscribe();
for (int i = 1; i <= 10; i++) {
sink.next("MSG #" + i);
TimeUnit.SECONDS.sleep(1);
}
sink.complete();
Журналы:
17:57:54.177 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
17:57:54.178 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - request(unbounded)
17:57:54.304 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: INIT)
17:57:54.305 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 1)
17:57:54.305 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 2)
17:57:54.306 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: 3)
17:57:54.307 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #1)
17:57:54.396 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #2)
17:57:55.454 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #3)
17:57:56.480 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #4)
17:57:57.505 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #5)
17:57:58.412 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #6)
17:57:59.448 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #7)
17:58:00.484 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #8)
17:58:01.496 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #9)
17:58:02.434 [reactor-http-nio-1] INFO reactor.Flux.Map.1 - onNext(Received: MSG #10)