Похоже, есть открытая проблема: https://github.com/reactor/reactor-core/issues/1397
В любом случае, я нашел решение для своей ситуации: block()
. Имейте в виду, что эта операция разрешена только для потоков, которые не помечены как «только неблокирующие операции». (См. Также Project Blockhound )
Напомним, проблема в том, что в какой-то момент у меня есть Flux<Mono<T>>
и .flatMap(...)
, .concatMap(...)
и т.д. c. использовать какую-то нетерпеливую выборку. Flux<Mono<T>>
, используемый для тестирования:
final Flux<Mono<Integer>> monoFlux = Flux.<Mono<Integer>, Integer>generate(
() -> 0,
(state, sink) -> {
state += 1;
sink.next(Mono.just(state));
return state;
}).doOnRequest(i -> System.out.println("Requested: " + i))
.doOnNext(v -> System.out.println("Emitted: " + v));
Чтобы не было нетерпеливого извлечения, я теперь создаю блок внутри карты, и он работает на удивление хорошо:
monoFlux.map(Mono::block)
.subscribe(new MySubscriber<>());
Результат:
Requested: 3
Emitted: MonoJust
Received: 1
Emitted: MonoJust
Received: 2
Emitted: MonoJust
Received: 3