Не забудьте ПОДПИСАТЬСЯ на меня
В вашем примере вы возвращаете Mono
ошибки.Сложность в том, что fromCallable
ожидает, что вы вернете скалярное значение.
Если мы посмотрим на API Mono.fromCallable
, мы обнаружим, что принятым параметром является
public static <T> Mono<T> fromCallable(Callable<? extends T> supplier)
, что означает, что если Callable
вернет Mono
, мы получим
Mono<Mono<Object>> monoOfMono = Mono.fromCallable(() ->
Mono.error(NullPointerException::new)
);
Таким образом, в случае, если нам нужно произвести ошибку, мы должны выбросить это исключение непосредственно в лямбду
Mono<Object> justMono = Mono.fromCallable(() -> {
throws new NullPointerException()
});
Подводя итог, Mono.fromCallable
не пытается проверить, является ли тип возвращаемого значения потоком.Таким образом, ваш Mono
считается нормальным скалярным значением и распространяется вниз по течению.Итак, чтобы исправить это, вы можете сделать следующее:
Выдает исключение в вашем tryDemo
методе:
Mono.zip( Mono.fromCallable( ()->tryDemo()), Mono.fromCallable( ()- >tryDemo1()),Mono.fromCallable(()-> tryDemo2() ))
.flatMap( data -> Mono.just( Tuples.of( data.getT1(), data.getT2(),data.getT3() ) ) )
.doOnError( e -> log.error( "Error {}", e.getStackTrace() ) )
.subscribe(T->{log.info("Tuple {}",T.getT2() );});
public String tryDemo() {
log.info( "Data--1" );
throw new NullPointerException();
//return "1";
}
Заменить fromCallable
на defer
Вы можете достичь лени, чего вы хотели достичь с помощью fromCallable
, используя оператор Mono.defer
, который в этом случае ожидал Mono
в качестве типа возврата от лямбды.
Если мы посмотрим на API этогоОператор, мы будем наблюдать следующее
public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier);
в этом случае, у нас есть Supplier
, который ожидает ровно Mono
чего-то в качестве возвращаемого типа, поэтому, как только вы попробуете свой исходный код снова, вы получите ожидаемыйповедение:
Mono<Object> justMono = Mono.defer(() ->
Mono.error(NullPointerException::new)
);
В этом случае, когда поставщик возвращает Mono
, Mono.defere
подписывается на него и получает сигнал об ошибке:
Mono.zip(
Mono.defer(() -> tryDemo()),
Mono.defer(() -> tryDemo1()),
Mono.defer(() -> tryDemo2())
)
.flatMap(data -> Mono.just(Tuples.of(
data.getT1(),
data.getT2(),
data.getT3()
)))
.doOnError( e -> log.error( "Error {}", e.getStackTrace() ) )
.subscribe(T -> {
log.info("Tuple {}",T.getT2() );
});