Пейджинг с Retrofit2 / RxJava и Single (s) - PullRequest
0 голосов
/ 03 октября 2018

Я пытаюсь реализовать подкачку для моих вызовов Retrofit2 / RxJava.

Я использую следующие версии: -

retrofit2Version = "2.4.0"
rxAndroidVersion = "2.0.2"
rxJavaVersion = "2.1.16"
okhttp3Version = "3.10.0"

Мои вызовы Retrofit API возвращают

Single<Response<ModelObject>>

В настоящее время я извлекаю все данные за один вызов, а затем сохраняю данные в моей локальной базе данных.

, хотя у меня есть два типа данных для извлечения.

Мой существующий процесс RxJavaэто: -

   initialCompletable()
            .doOnSubscribe(compositeDisposable::add)
            .andThen(Completable.defer(Database::delete))
            .andThen(Single.defer(() -> Network.getData(TYPE_ONE)))
            .doOnSuccess(persistData())
            .ignoreElement()
            .andThen(Single.defer(() -> Network.getData(TYPE_THREE)))
            .doOnSuccess(persistData())
            .ignoreElement()
            .retryWhen(errors -> errors.flatMap(e -> constructRetryHandler(counter)))
            .doOnComplete(onComplete)
            .doOnError(onError)
            .doFinally(Finally())
            .blockingAwait();

Мне нужно включить подкачку в вышеприведенное, что-то вроде: -

   initialCompletable()
            .doOnSubscribe(compositeDisposable::add)
            .andThen(Completable.defer(Database::delete))

    //Repeat these two steps until no data is returned, incrementing offset by limit each time.
            .andThen(Single.defer(() -> Network.getData(TYPE_ONE, offset, limit)))
            .doOnSuccess(persistData())

            .ignoreElement()

    //Repeat these two steps until no data is returned, incrementing offset by limit each time.
            .andThen(Single.defer(() -> Network.getData(TYPE_THREE, offset, limit)))
            .doOnSuccess(persistData())

            .ignoreElement()
            .retryWhen(errors -> errors.flatMap(e -> constructRetryHandler(counter)))
            .doOnComplete(onComplete)
            .doOnError(onError)
            .doFinally(Finally())
            .blockingAwait();

Я попробовал следующее, однако мне пришлось использовать Observables, а не Singles.Кроме того, этот подход только когда-либо возвращает две страницы, хотя есть еще данные для извлечения

    initialCompletable().doOnSubscribe(compositeDisposable::add)
            .andThen(Completable.defer(Database::delete))
            .andThen(Observable.defer(() -> getPageAndNext(offset, limit)))
            .concatMap(new Function<Response<ModelObject>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(final Response<ModelObject> response) throws Exception {
                    return Observable.just(response);
                }
            })
            .doOnComplete(onComplete)
            .doOnError(onError)
            .doFinally(Finally())
            .blockingSubscribe();





private ObservableSource<Response<ModelObject>> getPageAndNext(@NonNull final AtomicInteger offset, @NonNull final AtomicInteger limit) {
        return Network.getData(TYPE_ONE, offset.get(), limit.get())
                .toObservable().concatMap(new Function<Response<ModelObject>, ObservableSource<Response<ModelObject>>>() {
                    @Override
                    public ObservableSource<Response<ModelObject>> apply(final Response<ModelObject> response) {

                        if (isResponseErrorFree(response)) {
                            if (isDataRetrieved(response)) {
                                persist(response.body());
                                return Network.getData(TYPE_ONE, offset.addAndGet(limit.get()), limit.get()).toObservable();
                            } else {
                                return Observable.empty();
                            }
                        } else {
                            throw new RuntimeException("Retrieve Failed");
                        }
                    }
                });
    }                

Могу ли я использовать RxJava repeatWhen (), чтобы повторить подраздел моего потока?

Нужно ли мнепреобразовать мои Single (s) в Observables?

ОБНОВЛЕНИЕ

Я исправил мою проблему рекурсии с возвращением только двух "страниц", и у меня есть это рабочее решение.

Мне все еще не нравится переключаться между Single (s) и Observables, я не уверен, что это "имеет значение"

initialCompletable().doOnSubscribe(compositeDisposable::add)
        .andThen(Completable.defer(Database::delete))
        .andThen(Observable.defer(() -> getPageAndNextTypeOne(offset, limit)))
        .ignoreElements()
        .andThen(Completable.create(completableEmitter -> {
            offset.set(0);
            completableEmitter.onComplete();
        }))
        .andThen(Observable.defer(() -> getPageAndNextTypeThree(offset, limit)))
        .ignoreElements()
        .retryWhen(errors -> errors.flatMap(e -> constructRetryHandler(retryCounter)))
        .doOnComplete(onComplete)
        .doOnError(onError)
        .doFinally(Finally())
        .blockingAwait();

}

/**
 * @param offset
 * @param limit
 * @return
 */
private ObservableSource<Response<ModelObject>> getPageAndNextTypeOne(@NonNull final AtomicInteger offset, @NonNull final AtomicInteger limit) {
    return Network.getTypeOne(offset.get(), limit.get())
            .toObservable().concatMap((Function<Response<ModelObject>, ObservableSource<Response<ModelObject>>>) response -> {

                if (isResponseErrorFree(response)) {
                    if (isInboxArticlesRetrieved(response)) {
                        persistTypeOne(response.body());
                        offset.addAndGet(limit.get());
                        return Observable.just(response).concatWith(getPageAndNextTypeOne(offset, limit));
                    } else {
                        return Observable.empty();
                    }
                } else {
                    throw new RuntimeException("Failed");
                }
            });
}

/**
 * @param offset
 * @param limit
 * @return
 */
private ObservableSource<Response<ModelObject>> getPageAndNextTypeThree(@NonNull final AtomicInteger offset, @NonNull final AtomicInteger limit) {
    return Network.getTypeThree(offset.get(), limit.get())
            .toObservable().concatMap((Function<Response<ModelObject>, ObservableSource<Response<ModelObject>>>) response -> {

                if (isResponseErrorFree(response)) {
                    if (isReadingListRetrieved(response)) {
                        persistTypeThree(response.body());
                        offset.addAndGet(limit.get());
                        return Observable.just(response).concatWith(getPageAndNextTypeThree(offset, limit));
                    } else {
                        return Observable.empty();
                    }
                } else {
                    throw new RuntimeException("Failed");
                }
            });
}
...