Извлечение последних данных из Rx Может быть, когда Rx Observable изменяется - PullRequest
0 голосов
/ 20 марта 2020

У меня есть сценарий использования, в котором я хочу объединить данные из Observable<M> с последним значением из Maybe<N>. API с Maybe - это внешний API, который я не контролирую. По сути, я хочу лениво получить значение из Maybe, когда Observable изменится, и объединить их с помощью функции.

Для простоты, давайте предположим, что M равно Integer и N это String. Я попытался написать следующее:

public interface SomeExternalApi {
  Maybe<String> getLatestString();
}

public interface SomeInternalApi {
  Observable<Integer> getIntegerObservable();
}

public class MyClass {

  private final SomeExternalApi externalApi;
  private final SomeInternalApi internalApi;

  public Disposable subscribe(Consumer<? super IntegerAndString> observer) {
    return internalApi
        .getIntegerObservable()
        .withLatestFrom(
            externalApi.getLatestString().toObservable(),
            integerData, stringData -> new IntegerAndString(integerData, stringData)
        .subscribe(observer); 
  }

  private static class IntegerAndString {
    // data class that holds onto both
  } 
}

Проблема здесь, я думаю, в том, что Observable, сгенерированный externalApi.getLatestString(), просто замораживается с любым значением, которое было при вызове. Таким образом, он продолжает отправлять IntegerAndString независимо от того, каким было исходное значение, возвращаемое из Maybe<String>.

Есть ли способ написать Observable, который лениво выбирает свои данные из Maybe при вызове или какой-нибудь другой шаблон, который я могу использовать здесь?

1 Ответ

1 голос
/ 23 марта 2020

Maybe<T> по определению испускает максимум один элемент.

Я думаю, что вы в основном хотите вызывать externalApi.getLatestString() несколько раз. Я не проверял это, но вы можете попробовать использовать Observable.defer(...) здесь:

      public Disposable subscribe(Consumer<? super IntegerAndString> observer) {
        return internalApi
            .getIntegerObservable()
            .withLatestFrom( Observable.defer(
                () -> externalApi.getLatestString().toObservable() ),
                integerData, stringData -> new IntegerAndString(integerData, stringData)
            .subscribe(observer));
      }
...