Я хочу, чтобы ТОЛЬКО воспроизводимые элементы обрабатывались отдельно. Как этого добиться с помощью пружинного реактора? До сих пор я пришел с этим решением, которое довольно плохое, потому что оно предполагает, что в течение 50 миллисекунд новый элемент не появится.
ConnectableFlux<MovieDTO> replayedFlux = webSocketSink.replay(500);
replayedFlux.connect();
Flux<MovieDTO> replayedFluxFiltered = replayedFlux.take(Duration.ofMillis(50L))
...; // do some processing, filtering, distinct etc.
Flux<MovieDTO> ff = Flux.concat(replayedFluxFiltered, webSocketSink);