Я пытаюсь реализовать подкачку для моих вызовов Retrofit2 / RxJava.
Я использую следующие версии: -
retrofit2Version = "2.4.0"
rxAndroidVersion = "2.0.2"
rxJavaVersion = "2.1.16"
okhttp3Version = "3.10.0"
Мои вызовы Retrofit API возвращают
Single<Response<ModelObject>>
В настоящее время я извлекаю все данные за один вызов, а затем сохраняю данные в моей локальной базе данных.
, хотя у меня есть два типа данных для извлечения.
Мой существующий процесс RxJavaэто: -
initialCompletable()
.doOnSubscribe(compositeDisposable::add)
.andThen(Completable.defer(Database::delete))
.andThen(Single.defer(() -> Network.getData(TYPE_ONE)))
.doOnSuccess(persistData())
.ignoreElement()
.andThen(Single.defer(() -> Network.getData(TYPE_THREE)))
.doOnSuccess(persistData())
.ignoreElement()
.retryWhen(errors -> errors.flatMap(e -> constructRetryHandler(counter)))
.doOnComplete(onComplete)
.doOnError(onError)
.doFinally(Finally())
.blockingAwait();
Мне нужно включить подкачку в вышеприведенное, что-то вроде: -
initialCompletable()
.doOnSubscribe(compositeDisposable::add)
.andThen(Completable.defer(Database::delete))
//Repeat these two steps until no data is returned, incrementing offset by limit each time.
.andThen(Single.defer(() -> Network.getData(TYPE_ONE, offset, limit)))
.doOnSuccess(persistData())
.ignoreElement()
//Repeat these two steps until no data is returned, incrementing offset by limit each time.
.andThen(Single.defer(() -> Network.getData(TYPE_THREE, offset, limit)))
.doOnSuccess(persistData())
.ignoreElement()
.retryWhen(errors -> errors.flatMap(e -> constructRetryHandler(counter)))
.doOnComplete(onComplete)
.doOnError(onError)
.doFinally(Finally())
.blockingAwait();
Я попробовал следующее, однако мне пришлось использовать Observables, а не Singles.Кроме того, этот подход только когда-либо возвращает две страницы, хотя есть еще данные для извлечения
initialCompletable().doOnSubscribe(compositeDisposable::add)
.andThen(Completable.defer(Database::delete))
.andThen(Observable.defer(() -> getPageAndNext(offset, limit)))
.concatMap(new Function<Response<ModelObject>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(final Response<ModelObject> response) throws Exception {
return Observable.just(response);
}
})
.doOnComplete(onComplete)
.doOnError(onError)
.doFinally(Finally())
.blockingSubscribe();
private ObservableSource<Response<ModelObject>> getPageAndNext(@NonNull final AtomicInteger offset, @NonNull final AtomicInteger limit) {
return Network.getData(TYPE_ONE, offset.get(), limit.get())
.toObservable().concatMap(new Function<Response<ModelObject>, ObservableSource<Response<ModelObject>>>() {
@Override
public ObservableSource<Response<ModelObject>> apply(final Response<ModelObject> response) {
if (isResponseErrorFree(response)) {
if (isDataRetrieved(response)) {
persist(response.body());
return Network.getData(TYPE_ONE, offset.addAndGet(limit.get()), limit.get()).toObservable();
} else {
return Observable.empty();
}
} else {
throw new RuntimeException("Retrieve Failed");
}
}
});
}
Могу ли я использовать RxJava repeatWhen (), чтобы повторить подраздел моего потока?
Нужно ли мнепреобразовать мои Single (s) в Observables?
ОБНОВЛЕНИЕ
Я исправил мою проблему рекурсии с возвращением только двух "страниц", и у меня есть это рабочее решение.
Мне все еще не нравится переключаться между Single (s) и Observables, я не уверен, что это "имеет значение"
initialCompletable().doOnSubscribe(compositeDisposable::add)
.andThen(Completable.defer(Database::delete))
.andThen(Observable.defer(() -> getPageAndNextTypeOne(offset, limit)))
.ignoreElements()
.andThen(Completable.create(completableEmitter -> {
offset.set(0);
completableEmitter.onComplete();
}))
.andThen(Observable.defer(() -> getPageAndNextTypeThree(offset, limit)))
.ignoreElements()
.retryWhen(errors -> errors.flatMap(e -> constructRetryHandler(retryCounter)))
.doOnComplete(onComplete)
.doOnError(onError)
.doFinally(Finally())
.blockingAwait();
}
/**
* @param offset
* @param limit
* @return
*/
private ObservableSource<Response<ModelObject>> getPageAndNextTypeOne(@NonNull final AtomicInteger offset, @NonNull final AtomicInteger limit) {
return Network.getTypeOne(offset.get(), limit.get())
.toObservable().concatMap((Function<Response<ModelObject>, ObservableSource<Response<ModelObject>>>) response -> {
if (isResponseErrorFree(response)) {
if (isInboxArticlesRetrieved(response)) {
persistTypeOne(response.body());
offset.addAndGet(limit.get());
return Observable.just(response).concatWith(getPageAndNextTypeOne(offset, limit));
} else {
return Observable.empty();
}
} else {
throw new RuntimeException("Failed");
}
});
}
/**
* @param offset
* @param limit
* @return
*/
private ObservableSource<Response<ModelObject>> getPageAndNextTypeThree(@NonNull final AtomicInteger offset, @NonNull final AtomicInteger limit) {
return Network.getTypeThree(offset.get(), limit.get())
.toObservable().concatMap((Function<Response<ModelObject>, ObservableSource<Response<ModelObject>>>) response -> {
if (isResponseErrorFree(response)) {
if (isReadingListRetrieved(response)) {
persistTypeThree(response.body());
offset.addAndGet(limit.get());
return Observable.just(response).concatWith(getPageAndNextTypeThree(offset, limit));
} else {
return Observable.empty();
}
} else {
throw new RuntimeException("Failed");
}
});
}