Допустим, у меня есть простой поток, который что-то делает и имеет состояние, которое я хочу повторно использовать по линии.
Flux<Object> flux = Flux.empty()
.switchIfEmpty(Mono.error(new RuntimeException("asdf")))
.doOnError(t -> log.info("Got error {}", t.getMessage()))
.doOnNext(a -> log.info("bla: {}", a))
.cache();
Если я сразу подпишусь на этот поток, то обработка ошибок будет работать как Шарм. Однако, когда я снова использую этот поток и подписываюсь на него дважды, он выдает ErrorCallbackNotImplemented.
// reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: asdf
flux.doOnNext(a -> log.info("Doing next"))
.subscribe();
flux.doOnNext(a -> log.info("Doing next"))
.subscribe();
Вопрос: возможно ли повторно использовать обработчик ошибок (не извлекая его в метод)? Или я должен указать это каждый раз, когда я разветвляюсь? Любые другие детали обработки ошибок, которые я должен знать
РЕДАКТИРОВАТЬ: После дальнейшего изучения я понял, что это происходит из-за .cache()
. Похоже, если я поставлю .doOnError()
после .cache()
оператора, то это будет работать также.