Возобновить Flowable на TimeoutException в RxJava - PullRequest
0 голосов
/ 09 мая 2018

У меня есть текучая установка, подобная этой

            getFlowable()
            .timeout(5, TimeUnit.SECONDS)
            .observeOn(AndroidSchedulers.mainThread())
            .onErrorResumeNext(throwable -> {
                if(throwable instanceof TimeoutException) {
                    view.hideLoading();
                    return Flowable.just(new WorkExperience());
                }

                return Flowable.error(throwable);
            })
            .subscribe(workExperience -> {
                // do something
            }, throwable -> {
                Timber.e(throwable);
                view.hideLoading();
            }));

Обратите внимание на использование операторов timeout и onErrorResumeNext. То, что я пытаюсь здесь, - это продолжать получать события из исходного потока, если есть тайм-аут после игнорирования исключения тайм-аута. Я не уверен, что это правильный подход, потому что он не работает. Я больше не получаю события после истечения времени ожидания. Я могу использовать некоторые указатели здесь. Спасибо!

Ответы [ 3 ]

0 голосов
/ 09 мая 2018

Этого можно добиться, переместив оператор timeout и onErrorResumeNext в другую переменную:

private static Flowable<Integer> getFlowable() {
    return Flowable.range(1, 5)
        .concatMap(integer ->
            Flowable.just(integer)
                //the timer operator here is to simulate a long operation
                .concatMap(v -> Flowable.zip(Flowable.just(v), Flowable.timer(v, TimeUnit.SECONDS), (value, time) -> value))
                .timeout(3, TimeUnit.SECONDS)
                .onErrorResumeNext(throwable -> {
                    if (throwable instanceof TimeoutException) {
                        return Flowable.just(-1);
                    }

                    return Flowable.error(throwable);
                }));
}

public static void main(String args[]) {
    CountDownLatch latch = new CountDownLatch(1);

    getFlowable()
        .subscribe(workExperience -> {
            System.out.println("value = " + workExperience);
        }, throwable -> {
            throwable.printStackTrace();
        }, () -> {
            latch.countDown();
        });

    try {
        latch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
0 голосов
/ 09 мая 2018

Вот как я это решил

        Flowable<WorkExperience> flowable = getFlowable();
        Flowable.just(1)
                .flatMap(i -> flowable)
                .timeout(5, TimeUnit.SECONDS)
                .onErrorResumeNext(throwable -> {
                    if (throwable instanceof TimeoutException) {
                        view.hideLoading();
                    }

                    return Flowable.empty();
                })
                .subscribe(i -> {
                }, Timber::e)

       flowable.subscribe(workExperience -> {
                if (!TextUtils.isEmpty(workExperience.getCompany())) {
                    view.populateExperiences(workExperience);
                }
            }, throwable -> {
                Timber.e(throwable);
                view.hideLoading();
            })
0 голосов
/ 09 мая 2018

Если вы хотите продолжать наблюдать за источником, не прерывайте его. Я предполагаю, что вы действительно хотите получать живые уведомления каждые 5 секунд, чтобы пользователь не думал, что приложение зависло. Для этого просто объедините таймер, сопоставленный со значением «занят»:

getFlowable()
.mergeWith(Flowable.interval(5, TimeUnit.SECONDS).map(v -> new WorkExperience()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
    workExperience -> {
        // do something
    }, 
    throwable -> {
        Timber.e(throwable);
        view.hideLoading();
    }
);
...