Не уверен в ваших точных требованиях. Но я думаю, что здесь может помочь reduce
функция.
Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("a", "b", "c"));
stringFlux
.reduce("empty", (next, step) -> {
String current = message(step, next);
return execute(current);
})
.map(String::toUpperCase)
.subscribe(System.out::println);
Здесь функции message и execute являются такими:
String message(String step, String next){
return "message[" + step + ":" + next + "]";
}
String execute(String current){
return "execute(" + current + ")";
}
Конечным выводом будет последнее выполненное сообщение.
EXECUTE(MESSAGE[C:EXECUTE(MESSAGE[B:EXECUTE(MESSAGE[A:EMPTY])])])
Начальный шаг не может быть null
здесь. Вместо этого вы можете использовать пустой шаговый объект и рассматривать его как null
.
Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("a", "b", "c"));
stringFlux
.reduce(Flux.just("empty"), (Flux<String> next, String step) -> {
Flux<String> current = message(step, next);
return current.flatMap(this::execute);
})
.flatMapMany(a -> a)
.subscribe(System.out::println);
Flux<String> message(String step, Flux<String> next){
return next.map(v -> "message(" + v + ":" + step + ")");
}
Mono<String> execute(String current){
return Mono.just("execute(" + current + ")");
}
Вывод:
execute(message(execute(message(execute(message(empty:a)):b)):c))