RxJava 2 - как отменить бесконечный поток при ошибке и обработать его? - PullRequest
0 голосов
/ 05 июля 2018

У меня есть следующий бесконечный поток, который делает что-то каждую секунду.
Я хочу остановить поток при ошибке и обработать его.
Как мне этого добиться?

void doSomething() {
        Disposable disposable = execute(doSomethingInner(), 0L, TimeUnit.SECONDS, schedulerProvider.io(), someClass -> 1L).doOnError
                (throwable -> {
            Timber.e(throwable, "error happened");// Never triggered
        })
                .doOnNext(someClass -> Timber.i("doing the infinite stuff"))
                .subscribe(Functions.emptyConsumer(), throwable -> {
                    Timber.e(throwable, "stop doing the infinite stuff");// Never triggered
                });
    }

    Observable<SomeClass> doSomethingInner() {
        return Observable.error(new Exception("something went wrong"));
    }

    Observable<SomeClass> execute(Observable<SomeClass> source,
                                  long delayInterval,
                                  TimeUnit timeUnit,
                                  Scheduler scheduler,
                                  Function<SomeClass, Long> interval) {
        return Observable.defer(new Callable<ObservableSource<SomeClass>>() {
            long currentInterval = delayInterval;

            @Override
            public ObservableSource<SomeClass> call() {
                return Single.timer(currentInterval, timeUnit, scheduler)
                        .flatMapObservable(o -> source)
                        .doOnNext(t -> currentInterval = interval.apply(t));
            }
        })
                .repeat()
                .retry();
    }

1 Ответ

0 голосов
/ 05 июля 2018

Я думаю retry() поглотит вашу ошибку.
Попробуйте либо:

  • Удалить это retry() полностью
  • или измените его на retry(Predicate<Throwable>), чтобы решить, следует ли повторять.

По умолчанию подписчик отменяет поток при ошибке, если вы не используете его ранее, и вам необходимо получить обратный вызов на onError() внутри subscribe().

...