Создайте Observable, который генерирует Single при подписке, но останавливает и повторно использует определенное значение - PullRequest
0 голосов
/ 16 ноября 2018

Я использую Retrofit для вызова API, который возвращает Single, и я использую onErrorReturn для преобразования любых исключений в объект по умолчанию. Я хочу, чтобы потребители видели текущее значение, но если текущее значение является объектом по умолчанию, я хочу попробовать запросить API и дополнительно отправить этот результат. Чтобы усложнить ситуацию, у меня может быть несколько подписчиков.

Итак, я знаю, что Retrofit Single должен быть преобразован в правильный Observable поток, а не просто onNext / onComplete, как обычный Single.toObservable, но я не знаю как запросить API и отправить значение обратно моим предыдущим подписчикам, используя только Single из Retrofit.

Прямо сейчас я делаю:

fun request(): Observable<Foo> {
  if (behaviorSubject.value == defaultObject) {
    API
      .request()
      .onErrorReturn(defaultObject)
      .subscribe(behaviorSubject)
  }
  return behaviorSubject
}

Но я знаю, что вызов subscribe нарушает цепочку Rx, поэтому я пытаюсь выяснить, как от этого избавиться.

1 Ответ

0 голосов
/ 12 февраля 2019

спасибо за интересный сценарий.Вот решение, которое, как я считаю, покрывает ваши требования.Он длиннее вашего решения, но должен быть безопасным, когда речь идет о нескольких подписках.

// subscribe to this observable with one or more subscribers
val requestObservable = replayAndRetry(API.request(), defaultObject)

private fun <T> replayAndRetry(request: Single<T>, defaultValue: T): Observable<T> {
    val responses = BehaviorSubject.create<T>()

    val initialRequest = request
            .onErrorReturnItem(defaultValue)
            .doOnSuccess(responses::onNext)
            .ignoreElement()
            .cache() // run the initial request at most once

    val retryWhenNecessary = Maybe
            .fromCallable { if (responses.value == defaultValue) true else null }
            .flatMapCompletable { request
                    .doOnSuccess(responses::onNext)
                    .ignoreElement()
                    .onErrorComplete() // subject already has the default value
            }
            .toObservable<T>().share() // avoid multiple simultaneous retries

    return responses // source for all responses
            .mergeWith(initialRequest) // will run once and then complete
            .mergeWith(retryWhenNecessary) // will check for default item on every subscription
                                           // will not run simultaneous retries
}
...