В реактивных потоках «необязательные» обычно обрабатываются путем удаления отсутствующих элементов из потока (например, пустого Mono
или Flux
с пропущенным элементом.), Вместо того, чтобы иметь Flux<Optional>
, Mono<Optional>
или Flux<Mono>
При вызове синхронного метода getBaz
вы можете использовать одну операцию .handle
, например:
flux
.handle((foo, sink) -> {
try {
// propagate Baz down the stream
sink.next(getBaz(foo));
} catch (IOException e) {
// Since sink.next is not called here,
// the problematic element will be dropped from the stream
log.error(e);
}
})
При вызове асинхронного getBazRx
метод (возвращая Mono
), вы можете использовать onErrorResume
внутри flatMap
/ flatMapSequential
/ concatMap
, например:
flux
.flatMap(foo -> getBazRx(foo)
.onErrorResume(t -> {
log.error(t);
return Mono.empty();
}))
(или вы можете переместить .onErrorResume
внутрь .getBazRx
, в зависимости от того, где вы хотите поймать и игнорировать исключение)
Кроме того, поскольку вы упомянули об этом в своем вопросе ... если вы хотите создать getBazRx
, который охватывает getBaz
, вам следует никогда делать что-то подобное, если getBaz
имеет потенциал для блокировки:
Mono<Baz> getBazRx(Foo foo) {
// BAD!!!
try {
return Mono.just(getBaz(foo));
} catch (IOException e) {
return Mono.error(e) // or Mono.empty() if you want to ignore
}
}
Эта реализация на самом деле является просто синхронным методом олицетворением асинхронного метода.Есть две проблемы с ним:
- Работа выполняется немедленно, а не после подписки на возвращенные
Mono
- Если
getBaz
блокируется, вы можете заблокировать событиеloop
Вместо этого вы должны отложить работу до подписки на моно и запустить любую операцию блокировки на Scheduler
, предназначенную для операций блокировки, например:
Mono<Baz> getBazRx(Foo foo) {
return Mono.fromSupplier(() -> {
try {
return getBaz(foo);
} catch (IOException e) {
throw Exceptions.propagate(e); // or return null to ignore and complete empty
}
})
.subscribeOn(Schedulers.elastic()); // run on a scheduler suitable for blocking work
}