RxJava: Как постобработать подмножество элементов, но вернуть полный список вызывающей стороне - PullRequest
0 голосов
/ 27 августа 2018

Я пишу REST-клиент, который обращается к удаленному серверу и должен сделать что-то похожее на это:

  • Получить список элементов с удаленного сервера
  • Сравнить полученный список с локальными данными
  • Для любых элементов, которые были изменены локально (возможно, нет), отправьте изменения на сервер
  • Наконец, верните полный список предметов вызывающей стороне

Я использую RxJava (2) и немного борюсь с реализацией. Я думаю, что это должно быть примерно так (псевдокод):

public Single<List<Item>> synchronize()
{
    return server.fetchItems().flatMap(receivedItemList -> {
        List<Single<Item>> syncList = new List<>();
        for (Item item : receivedItemList) {
            if (item.needsSync()) {
                Single<Item> modifiedItem = server.updateItem(item);
                syncList.add(modifiedItem);
            }    
        }

        if (!syncList.empty()) {
            return Single.zip(syncList,
                              items -> {
                                  // Changes have been sync'ed,
                                  // return full list to caller
                                  return receivedItemList;
                              });
        } else {
            return Single.just(receivedItemList);
        }
    });
}

Я не уверен, является ли это правильным подходом: например, функция zipper возвращает полный список, если элементы для вызывающей стороны - я в основном использую zip, чтобы убедиться, что все обновления выполняются до завершения операции. Также тот факт, что я вызываю Single.zip () в одном случае и Single.just () в другом, выглядит немного неправильно.

Это выглядит правильно? Это идиоматический способ сделать это с RxJava? Есть ли лучший подход?

1 Ответ

0 голосов
/ 28 августа 2018

вы близки к способу Rx сделать это, однако некоторые изменения в коде могут немного его улучшить, так что те некоторые примечания к вашему коду, я последую за этим со своей скромной реализацией.

  • каждый раз, когда вы думаете использовать цикл for в Rx, попробуйте оператор fromIterable(), этот оператор генерирует элемент за элементом, чтобы вы могли выполнять свою логику с ним как фильтрация.
  • В Rx много операторов фильтрации, поэтому вам не нужно писать чеки в потоке. наиболее популярным является filter(), который принимает предикат в качестве аргумента и возвращает наблюдаемый тип.
  • Если вы хотите собрать выбросы в одном списке, есть оператор toList(), который выдает весь список.

Теперь к реализации

  public Observable<List<String>> fetchNetworkItems() {
    return Observable.just(Arrays.asList("A", "B", "C")); //maybe retrofit
  }

  public Observable<List<String>> localItems() {
    return Observable.just(Arrays.asList("A", "B", "C")); //maybe Room
  }

  public Completable updateRemoteItem(String localItem) {
    return Completable.create(emitter -> {
      //update logic if things are going well use emitter.onComplete();
      //catch any exception  or errors --> use emitter.onError() to throw it
    });
  }

  public Single<List<String>> updateRemoteItems() {
    return fetchNetworkItems().flatMap(Observable::fromIterable) //iterate over the remote items
        .flatMap(remoteItem -> localItems().flatMap(Observable::fromIterable) //iterate over the local items
            .filter(localItem -> !localItem.equals(remoteItem)) //decide which one need to update
            .flatMap(localItem -> updateRemoteItem(localItem).andThen(Observable.just(localItem))) //update then get the local item
            .defaultIfEmpty(remoteItem) //since no need to update it doesn't matter which item we return, we have access to the remote
        ).toList();
  }

с помощью fromIterable() и flatMap() и операторов фильтрации вы можете функционально реализовать логику вложенного цикла.

Редактировать : следующая реализация, если вы хотите обновить все элементы в параллельно, обратите внимание, что с zip(), если один вызов API не удается, это вызывает все другие вызовы не будут выполнены.

  public Observable<List<String>> fetchNetworkItems() {
    return Observable.just(Arrays.asList("A", "B", "C")) //maybe retrofit
        .subscribeOn(io.reactivex.schedulers.Schedulers.io());//do work in background

  }

  public Observable<List<String>> localItems() {
    return Observable.just(Arrays.asList("A", "B", "C")) //maybe Room
        .subscribeOn(io.reactivex.schedulers.Schedulers.io());//do work in background
  }

  public Observable<String> updateRemoteItem(String localItem) { //may be retrofit
    return Observable.just(localItem)
        .subscribeOn(io.reactivex.schedulers.Schedulers.io());//do work in background
  }

  public List<Observable<String>> generateApisCall(List<String> remotesToBeUpdated) {
    return Observable.fromIterable(remotesToBeUpdated)
        .map(this::updateRemoteItem)
        .toList()
        .blockingGet();
  }

  public Single<List<String>> getItemsToUpdate() {
    return fetchNetworkItems()
        .doOnNext(strings -> {/*may be save the list to access later*/}) //side effects is not a good thing, but that's best what I thought for now.
        .flatMap(Observable::fromIterable) //iterate over the remote items
        .flatMap(remoteItem -> localItems().flatMap(
            Observable::fromIterable) //iterate over the local items
                .filter(localItem -> (localItem/*.id*/ == remoteItem/*.id*/) /* && remoteItem.old()*/)
            //decide which one need to update
        ).toList();
  }

  public void update() {
    getItemsToUpdate()
        .subscribeOn(io.reactivex.schedulers.Schedulers.io())
        .observeOn(io.reactivex.android.schedulers.AndroidSchedulers.mainThread())
        .subscribe((strings, throwable) -> {
          Observable.zip(generateApisCall(strings), objects -> "success").subscripe(string -> {}, throwable -> {});
        });
  }

это больше похоже на псевдокод, поэтому попробуйте проверить его, и тип Item здесь будет заменен на тип String.

...