Почему обработка ошибок не работает с многоразовым потоком? - PullRequest
0 голосов
/ 19 февраля 2020

Допустим, у меня есть простой поток, который что-то делает и имеет состояние, которое я хочу повторно использовать по линии.

Flux<Object> flux = Flux.empty()
    .switchIfEmpty(Mono.error(new RuntimeException("asdf")))
    .doOnError(t -> log.info("Got error {}", t.getMessage()))
    .doOnNext(a -> log.info("bla: {}", a))
    .cache();

Если я сразу подпишусь на этот поток, то обработка ошибок будет работать как Шарм. Однако, когда я снова использую этот поток и подписываюсь на него дважды, он выдает ErrorCallbackNotImplemented.

// reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: asdf
flux.doOnNext(a -> log.info("Doing next"))
    .subscribe();

flux.doOnNext(a -> log.info("Doing next"))
    .subscribe();

Вопрос: возможно ли повторно использовать обработчик ошибок (не извлекая его в метод)? Или я должен указать это каждый раз, когда я разветвляюсь? Любые другие детали обработки ошибок, которые я должен знать

РЕДАКТИРОВАТЬ: После дальнейшего изучения я понял, что это происходит из-за .cache(). Похоже, если я поставлю .doOnError() после .cache() оператора, то это будет работать также.

...