Я использую проектный реактор версии 3.2.10 вместе с версией пружинной ленты 5.1.8.RELEASE .
Я использую UnicastProcessor, как показано ниже, для передачи сообщений от вызовов http на приемник, как показано ниже.
UnicastProcessor<String> processor = UnicastProcessor.create();
FluxSink<String> sink = processor.sink(OverflowStrategy.BUFFER);
Flux<String> postParserFlux = processor.compose(httpMessageParser);
preparePostParserChain(postParserFlux).subscribe();
При каждом вызове http я выполняю sink.next(httpMsg.body())
.
private Flux<String> preparePostParserChain(Flux<String> incomingFlux){
ConnectableFlux<String> preprocessedFlux =
incomingFlux
.filter(preprocessor) - (1)
.replay();
Flux<String> flux1 = preprocessedFlux.compose(//some composer); -(2)
Flux<String> flux2 = preprocessedFlux.compose(//some composer); -(3)
preprocessedDataFlux.connect();
return Flux.merge(flux1,flux2);
}
Я заметил, что при огромных нагрузках некоторые из http-сообщений достигают шага (1), но не достигают шагов (2) или (3). Они теряются после воспроизведения ().
Пожалуйста, помогите.