Flux.repeat
и Mono.repeat
будут повторно подписываться на источник, поэтому каждый предыдущий шаг источника будет повторяться с новой подпиской.
Так как calculateNextResult
и compareResults
оба являются синхроннымиОперации в вашем примере, вы можете использовать простой цикл for
для повторения ...
public Flux<Boolean> run(some params, Flux<Integer> prevResults){
return prevResults.map(elem -> {
for (int i = 0; i < 5; i++) {
if (compareResults(elem, calculateNextResult(some params))) {
return true;
}
}
return false;
});
}
Если бы calculateNextResult
или compareResults
были реактивными методами, возвращающими Mono
, то вы могли бы использовать flatMap
вместо map
и используйте один из Mono.repeat*
методов.
Например, что-то вроде этого:
private Mono<Integer> calculateNextResult(some params) {
// some implementation
}
private Mono<Boolean> compareResults(int prevRes, int nextRes) {
// some implementation
}
public Flux<Boolean> run(some params, Flux<Integer> prevResults){
return prevResults.flatMap(prevResult ->
calculateNextResult(some params)
.flatMap(nextResult -> compareResults(prevResult, nextResult))
.filter(comparisonResult -> comparisonResult)
.repeatWhenEmpty(Repeat.times(5))
.defaultIfEmpty(false));
}
В этом примере repeatWhenEmpty
вызовет новую подписку на Mono, созданную в flatMap, что приведет к calculateNextResult
пересчитать (при условии, что Mono, возвращенный из calculateNextResult
, настроен для расчета значения для каждой подписки).