У меня довольно частое требование выполнять некоторую логику в onSubscribe и onFinally, что может быть достигнуто с помощью
private <T> Mono<T> doAtStartAndEnd(Mono<T> source) {
return source.doOnSubscribe((s) -> {
System.out.println("ON SUBSCRIBE");
}).doFinally((f) -> {
System.out.println("ON FINALLY");
});
}
и используя его через transform
в следующей цепочке:
List<String> result = Mono.fromCallable(() -> getListOfStrings())
// .log()
.flatMapIterable(list -> list)
.map(String::toUpperCase)
.collectList()
.transform(this::doAtStartAndEnd)
.block();
Конечно, ожидаемое поведение таково, что ON SUBSCRIBE
появляется в консоли перед вызовом вызываемого здесь, getListOfStrings()
. Однако логика подписки MonoFlattenIterable
ведет к противоположному поведению. Это относится не только к flatMapIterable
, но и к различным другим операторам, таким как zip
.
Если я раскомментирую строку с .log()
, цепочка будет работать как положено.
Может быть, это точно так же, как в Reactive Gem # 22 * 1019 *, но как мне добиться желаемого поведения, не оборачивая Mono / Flux снова, например? в Mono.defer(() -> Mono.fromCallable(() -> getListOfStrings()))
?