При использовании Flux.replay () иногда сообщения теряются при высокой нагрузке - PullRequest
0 голосов
/ 03 февраля 2020

Я использую проектный реактор версии 3.2.10 вместе с версией пружинной ленты 5.1.8.RELEASE .

Я использую UnicastProcessor, как показано ниже, для передачи сообщений от вызовов http на приемник, как показано ниже.

      UnicastProcessor<String> processor = UnicastProcessor.create();
      FluxSink<String> sink = processor.sink(OverflowStrategy.BUFFER);
      Flux<String> postParserFlux = processor.compose(httpMessageParser);
      preparePostParserChain(postParserFlux).subscribe();

При каждом вызове http я выполняю sink.next(httpMsg.body()).

private Flux<String> preparePostParserChain(Flux<String> incomingFlux){

  ConnectableFlux<String> preprocessedFlux =
        incomingFlux
            .filter(preprocessor) - (1)
            .replay();

 Flux<String> flux1 = preprocessedFlux.compose(//some composer); -(2)

 Flux<String> flux2 = preprocessedFlux.compose(//some composer); -(3)

 preprocessedDataFlux.connect();

 return Flux.merge(flux1,flux2);
}

Я заметил, что при огромных нагрузках некоторые из http-сообщений достигают шага (1), но не достигают шагов (2) или (3). Они теряются после воспроизведения ().

Пожалуйста, помогите.

...