вы близки к способу Rx сделать это, однако некоторые изменения в коде могут немного его улучшить, так что те некоторые примечания к вашему коду, я последую за этим со своей скромной реализацией.
- каждый раз, когда вы думаете использовать цикл for в Rx, попробуйте оператор
fromIterable()
, этот оператор генерирует элемент за элементом, чтобы вы могли выполнять свою логику с ним как фильтрация.
- В Rx много операторов фильтрации, поэтому вам не нужно писать чеки в потоке. наиболее популярным является
filter()
, который принимает предикат в качестве аргумента и возвращает наблюдаемый тип.
- Если вы хотите собрать выбросы в одном списке, есть оператор
toList()
, который выдает весь список.
Теперь к реализации
public Observable<List<String>> fetchNetworkItems() {
return Observable.just(Arrays.asList("A", "B", "C")); //maybe retrofit
}
public Observable<List<String>> localItems() {
return Observable.just(Arrays.asList("A", "B", "C")); //maybe Room
}
public Completable updateRemoteItem(String localItem) {
return Completable.create(emitter -> {
//update logic if things are going well use emitter.onComplete();
//catch any exception or errors --> use emitter.onError() to throw it
});
}
public Single<List<String>> updateRemoteItems() {
return fetchNetworkItems().flatMap(Observable::fromIterable) //iterate over the remote items
.flatMap(remoteItem -> localItems().flatMap(Observable::fromIterable) //iterate over the local items
.filter(localItem -> !localItem.equals(remoteItem)) //decide which one need to update
.flatMap(localItem -> updateRemoteItem(localItem).andThen(Observable.just(localItem))) //update then get the local item
.defaultIfEmpty(remoteItem) //since no need to update it doesn't matter which item we return, we have access to the remote
).toList();
}
с помощью fromIterable()
и flatMap()
и операторов фильтрации вы можете функционально реализовать логику вложенного цикла.
Редактировать : следующая реализация, если вы хотите обновить все элементы в
параллельно, обратите внимание, что с zip()
, если один вызов API не удается, это вызывает все
другие вызовы не будут выполнены.
public Observable<List<String>> fetchNetworkItems() {
return Observable.just(Arrays.asList("A", "B", "C")) //maybe retrofit
.subscribeOn(io.reactivex.schedulers.Schedulers.io());//do work in background
}
public Observable<List<String>> localItems() {
return Observable.just(Arrays.asList("A", "B", "C")) //maybe Room
.subscribeOn(io.reactivex.schedulers.Schedulers.io());//do work in background
}
public Observable<String> updateRemoteItem(String localItem) { //may be retrofit
return Observable.just(localItem)
.subscribeOn(io.reactivex.schedulers.Schedulers.io());//do work in background
}
public List<Observable<String>> generateApisCall(List<String> remotesToBeUpdated) {
return Observable.fromIterable(remotesToBeUpdated)
.map(this::updateRemoteItem)
.toList()
.blockingGet();
}
public Single<List<String>> getItemsToUpdate() {
return fetchNetworkItems()
.doOnNext(strings -> {/*may be save the list to access later*/}) //side effects is not a good thing, but that's best what I thought for now.
.flatMap(Observable::fromIterable) //iterate over the remote items
.flatMap(remoteItem -> localItems().flatMap(
Observable::fromIterable) //iterate over the local items
.filter(localItem -> (localItem/*.id*/ == remoteItem/*.id*/) /* && remoteItem.old()*/)
//decide which one need to update
).toList();
}
public void update() {
getItemsToUpdate()
.subscribeOn(io.reactivex.schedulers.Schedulers.io())
.observeOn(io.reactivex.android.schedulers.AndroidSchedulers.mainThread())
.subscribe((strings, throwable) -> {
Observable.zip(generateApisCall(strings), objects -> "success").subscripe(string -> {}, throwable -> {});
});
}
это больше похоже на псевдокод, поэтому попробуйте проверить его, и тип Item
здесь будет заменен на тип String
.