RXJava 2.0 глубоко вложенная цепочка, не выполняющая части - PullRequest
0 голосов
/ 16 октября 2018

У меня есть следующий код RXJava 2.0:

private fun <T> wrapApiRequestSingle(apiCall: () -> Single<T>, token: Token) : Single<T> =
        Single.defer {
            apiCall.invoke()
        }.retryWhen { obsError ->
            obsError.flatMap<Single<T>> { error ->
                when (error) {
                    is TokenExpiredException -> {
                        userRepository.getLoggedInUser().toFlowable().flatMap { userOptional ->
                            Publisher<Single<T>> {
                                if (userOptional.isPresent) {
                                    mobileRemote.swapRefreshTokenForAccessToken(token.refreshToken, userOptional.get().emailAddress)
                                            .onErrorResumeNext { refreshError ->
                                                EventReporter.e(TAG, "Failed to refresh JWT.", refreshError)
                                                tokenUseCases.deleteToken().andThen(preferences.singleOrError().flatMap { prefs ->
                                                    prefs.apply {
                                                        this.pushRegistrationId = ""
                                                        this.token = null
                                                    }.apply()

                                                    Single.error<Token>(NoLoggedInUserException())
                                                })
                                            }
                                } else {
                                    EventReporter.e(TAG, "No user was logged in.", error)
                                    tokenUseCases.deleteToken().andThen(preferences.singleOrError().flatMap { prefs ->
                                        prefs.apply {
                                            this.pushRegistrationId = ""
                                            this.token = null
                                        }.apply()

                                        Single.error<Token>(NoLoggedInUserException())
                                    })
                                }
                            }
                        }
                    }
                    else -> {
                        Flowable.error(error)
                    }
                }
            }
        }

По идее, все вызовы API будут обернуты этой функцией.Эта функция имеет 4 основных пути выполнения:

  1. Вызов завершается
  2. Вызов завершается неудачно из-за TokenExpiredException, код возвращается к попытке обновления, если и только если,есть залогиненный пользователь.Обновление выполнено успешно, и исходный вызов выполняется снова.
  3. Вызов не выполняется из-за TokenExpiredException, код возвращается к попытке обновления, если и только если существует зарегистрированный пользователь.Если обновление не удалось, удалите некоторые локальные данные и верните Single, содержащий NoLoggedInUserException.
  4. Вызов не удался, и пользователь не вошел в систему, поэтому удалите некоторые локальные данные и верните Single, содержащийa NoLoggedInUserException.

Код компилируется, и я прочитал документы по всем функциям, которые я использую, но во время выполнения не удается вернуть Single.error(NoLoggedInUserException) для 4-го случая.

Я решил написать контрольный пример для проверки 4-го пути без необходимости использования реального API или использования каких-либо реальных служб.Вот мой тестовый код (он использует Mockito для макетирования различных подсистем, таких как mobileRemote и tokenUseCases:

/**
 * Set of tests to test the main presenter
 */
class ResourceInteractorTests : RobolectricTestBase() {
    @Mock
    private lateinit var injector: InjectorProvider

    @Mock
    private lateinit var preferences: Preferences

    @Mock
    private lateinit var userRepository: UserStorage

    @Mock
    private lateinit var tokenUseCases: TokenUseCases

    @Mock
    private lateinit var mobileRemote: MobileRemote

    @Before
    fun setup() {
        // Initialize all the mocks in this class
        MockitoAnnotations.initMocks(this)

        whenever(this.injector.providePreferences()).thenReturn(Observable.just(preferences))
        whenever(this.injector.provideUserStorage()).thenReturn(userRepository)
        whenever(this.injector.provideTokenUseCases()).thenReturn(tokenUseCases)
        whenever(this.injector.provideMobileRemote()).thenReturn(mobileRemote)
    }

    /**
     * Test that getLocations ultimately propagates a [NoLoggedInUserException]
     * When the remote call returns a [TokenExpiredException] and there is no logged in user
     */
    @Test
    fun onGetLocationsFailTokenExpiredNoLoggedInUser() {
        // ARRANGE
        whenever(this.tokenUseCases.getToken()).thenReturn(Single.just(Token("", Date(), "")))
        whenever(this.mobileRemote.getLocations("")).thenReturn(Single.error(TokenExpiredException()))
        whenever(this.userRepository.getLoggedInUser()).thenReturn(Single.just(Optional.absent()))
        whenever(this.tokenUseCases.deleteToken()).thenReturn(Completable.complete())

        val interactor = ResourceInteractor(this.injector)

        // ACT
        val shouldBeError = interactor.getLocations().test()
        shouldBeError.awaitTerminalEvent(3, TimeUnit.SECONDS)

        // ASSERT
        shouldBeError.assertError { it is NoLoggedInUserException }
    }
}

Идея состоит в том, что, пока мой вызов API возвращает TokenExpiredException тогда будет нажата блокировка retryWhen (это как я установил контрольные точки в своем коде для проверки). Затем смоделированный userRepository возвращает Optional.absent(), чтобы тестируемый код вошел в блок else вдно (что и происходит). Наконец, макет tokenUseCases возвращает Completable.complete() для deleteTokenOperation, что должно привести к тому, что время выполнения войдет в блок andThen. Однако во время выполненияБлок andThen никогда не достигается, и вся цепочка заканчивается без ошибок. Я не могу понять, почему это происходит, у кого-нибудь есть какие-либо идеи?

РЕДАКТИРОВАТЬ:

Меня спросилипочему я использую Publisher<Single<T>>, это потому, что метод retryWhen для типа Single требует его:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
    return toSingle(toFlowable().retryWhen(handler));
}

В то время как Observable retryWhen не делает:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> retryWhen(
final Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler) {
    ObjectHelper.requireNonNull(handler, "handler is null");
    return RxJavaPlugins.onAssembly(new ObservableRetryWhen<T>(this, handler));
}

РЕДАКТИРОВАТЬ 2:

Тестируемый код, который вызывает приватную функцию wrapApiRequestSinglОн здесь (чтобы прояснить вопрос):

override fun getLocations(): Single<Collection<Location>> =
        tokenUseCases.getToken()
                .flatMap { jwt ->
                    wrapApiRequestSingle({
                        mobileRemote.getLocations(jwt.accessToken)
                    }, jwt)
                }

РЕДАКТИРОВАТЬ 3:

Использовал подход TDD для написания функции, полностью запустив ее снова и выполняя тест каждый раз, когда я добавлял новыйлиния.Теперь функция выглядит следующим образом:

private fun <T> wrapApiRequestSingle(apiCall: () -> Single<T>, token: Token) : Single<T> =
        Single.defer {
            apiCall.invoke()
        }.retryWhen { obsError ->
            obsError.flatMap<T> { error ->
                when (error) {
                    is TokenExpiredException -> {
                        userRepository.getLoggedInUser().toFlowable().flatMap { userOptional ->
                            if (userOptional.isPresent) {
                                mobileRemote.swapRefreshTokenForAccessToken(token.refreshToken, userOptional.get().emailAddress).toFlowable()
                                        .onErrorResumeNext {
                                            tokenUseCases.deleteToken().andThen(preferences.toFlowable(BackpressureStrategy.BUFFER).flatMap { prefs ->
                                                prefs.apply {
                                                    this.pushRegistrationId = ""
                                                    this.token = null
                                                }.apply()

                                                Flowable.error<T>(NoLoggedInUserException())
                                            })
                                        }
                            } else {
                                EventReporter.e(TAG, "No user was logged in.", error)
                                tokenUseCases.deleteToken().andThen(
                                        preferences.toFlowable(BackpressureStrategy.BUFFER).flatMap { prefs ->
                                            prefs.apply {
                                                this.pushRegistrationId = ""
                                                this.token = null
                                            }.apply()

                                            Flowable.error<T>(NoLoggedInUserException())
                                        })
                            }
                        }
                    } else -> {
                        Flowable.error<T>(error)
                    }
                }
            }
        }

Однако есть проблема с компилятором, выясняющим, как решить, какую перегрузку onErrorResume затем я хочу вызвать.Я попытался явно объявить перегрузку, указав тип для параметра lambda, но компилятор все еще жалуется на неоднозначный тип eval.

1 Ответ

0 голосов
/ 16 октября 2018

Из моего комментария я имел в виду следующее:

private fun <T> wrapApiRequestSingle(apiCall: () -> Single<T>, token: Token) : Single<T> =
    Single.defer {
        apiCall.invoke()
    }.retryWhen { obsError ->
        obsError.flatMap<T> { error ->    // <---------------------------------------
            when (error) {
                is TokenExpiredException -> {
                    userRepository.getLoggedInUser()
                    .toFlowable()
                    .flatMap { userOptional ->
                        Publisher<Single<T>> {
                            if (userOptional.isPresent) {
                                mobileRemote.swapRefreshTokenForAccessToken(
                                    token.refreshToken, userOptional.get().emailAddress)
                                .onErrorResumeNext { refreshError ->
                                    EventReporter.e(TAG, "Failed to refresh JWT.", refreshError)
                                    tokenUseCases.deleteToken()
                                    .andThen(preferences
                                        .singleOrError()
                                        .flatMap { prefs ->
                                                prefs.apply {
                                                    this.pushRegistrationId = ""
                                                    this.token = null
                                                }.apply()

                                                Single.error<Token>(NoLoggedInUserException())
                                            })
                                        }
                            } else {
                                EventReporter.e(TAG, "No user was logged in.", error)
                                tokenUseCases.deleteToken()
                                .andThen(preferences
                                    .singleOrError()
                                    .flatMap { prefs ->
                                        prefs.apply {
                                            this.pushRegistrationId = ""
                                            this.token = null
                                        }.apply()

                                    Single.error<Token>(NoLoggedInUserException())
                                })
                            }
                        }
                    }.flatMapSingle { it } // <------------------------------------
                }
                else -> {
                    Flowable.error(error)
                }
            }
        }
    }
...