Применение Single к ObservableSource и не перечитывание - PullRequest
0 голосов
/ 26 июня 2019

Я довольно новичок в RX в целом и в rxjava в частности, извините за ошибки.

Эта операция зависит от двух асинхронных операций.

Первая использует функцию фильтра для попыткичтобы получить единственную сущность из списка, возвращаемого асинхронной Наблюдаемой.

Вторая - асинхронная операция, которая связывается с устройством и производит Наблюдаемую за обновлениями состояния.

Я хочу взятьСингл, созданный из функции фильтра, примените его к pairReader(...) и подпишитесь на его Observable для обновлений.Я могу заставить это работать так, как показано, но только если я включу комментарий take(1), в противном случае я получу исключение, потому что цепочка пытается получить другое значение из Single.

  Observable<DeviceCredential> getCredentials() {
    return deviceCredentialService()
            .getCredentials()
            .flatMapIterable(event -> event.getData());
  }

  Single<Organization> getOrgFromCreds(String orgid) {
    return getCredentials()
      // A device is logically constrained to only have a single cred per org
      .map(DeviceCredential::getOrganization)
      .filter(org -> org.getId().equals(orgid))
      .take(1)  // Without this I get an exception
      .singleOrError();
  }

  Function<Organization, Observable<Reader.EnrollmentState>> pairReader(String name) {
    return org -> readerService().pair(name, org);
  }

getOrgFromCreds(orgid)
  .flatMapObservable(pairReader(readerid))
  .subscribe(state -> {
     switch(state) {
       case BEGUN:
         LOG.d(TAG, "Pairing begun");
         break;
       case PAIRED:
         LOG.d(TAG, "Pairing success");
         callback.success();
         break;
       case NOTIFIED_SERVER:
         LOG.d(TAG, "Pairing server notified");
         break;
     }},
     error -> {
       Crashlytics.logException(error);
       callback.error(error.getLocalizedMessage());
     });

Ответы [ 3 ]

0 голосов
/ 27 июня 2019

Если я вас правильно понял, вам нужно выполнить какое-то действие, используя ранее извлеченные асинхронные данные. Таким образом, вы можете использовать оператор .zip () . Вот пример:

Observable.zip(
                getOrgFromCreds().toObservable(),
                getCredentials(),
                (first, second) -> /*create output object here*/
)
                .subscribe(
                        (n) -> /*do onNext*/,
                        (e) -> /*do onError*/
                );

Обратите внимание, что оператор .zip() будет ожидать обоих излучений из двух потоков, а затем он создаст внешнее излучение, используя функцию, которую вы предоставили в "создать выходной объект здесь". Если вы не хотите ждать оба элемента - вы можете использовать .combineLatest () .

0 голосов
/ 04 июля 2019

Проблема здесь заключалась в том, что API был разработан странным образом (и, к сожалению, имеет очень плохую документацию).Я не мог понять, почему я получал дубликаты, и подумал, что я неправильно использовал flatMapIterable.

То, что на самом деле создает вызов deviceCredentialService.getCredentials(), - это наблюдаемое, которое испускает DataEvent объекты, которые являются простыми обертками надсписок результатов и с флагом того, откуда пришли результаты.

Разработчик API хотел разрешить пользователю использовать локально кэшированные данные для немедленного заполнения пользовательского интерфейса при выполнении более длинного запроса к API REST.Свойство DataEvent.from - это перечисление, которое помечает источник либо из локального кэша устройства, либо из удаленного вызова API.

Я решил это, просто проигнорировав результаты, поступающие из локального кэша, и выполнив только команду emit.Результаты из API:

  Observable<DeviceCredential> getCredentials() {
    return deviceCredentialService()
      .getCredentials()
      // Only get creds from network
      .filter(e -> e.getFrom() == SyncedDataSourceObservableFactory.From.SOURCE)
      .flatMapIterable(e -> e.getData());
  }

  Single<Organization> getOrgFromCreds(String orgid) {
    return getCredentials()
      // A device is logically constrained to only have a single cred per org
      .map(DeviceCredential::getOrganization)
      .filter(org -> org.getId().equals(orgid))
      .singleOrError();
  }

В этом случае планируется использовать запоминание для кэширования сущностей таким образом, чтобы дать реализующему приложению доступ к аннулированию кэша.Поскольку предоставленный интерфейс не позволяет подавлять вызов API, невозможно работать только с кешем, если приложение чувствует, что оно свежо.

0 голосов
/ 26 июня 2019

Если исходный поток испускает более одного элемента, singleOrError() должен выдать ошибку. Doc

В вашем случае используйте вместо first() или firstOrError().

  Single<Organization> getOrgFromCreds(String orgid) {
    return getCredentials()
      .map(DeviceCredential::getOrganization)
      .filter(org -> org.getId().equals(orgid))
      .firstOrError();
  }

...