RxJava2 & Retrofit: как получить страницы данных - PullRequest
0 голосов
/ 26 мая 2018

Цель: Я хочу многократно вызывать сервис Retrofit (GET), который возвращает выгружаемые данные, пока я не исчерпал их страницы.Переходя от страницы 0 к странице №.

Сначала я посмотрел на эти два ответа.Первый действительно работает, но я не слишком люблю рекурсивное решение, так как оно может привести к переполнению стека.Второй сбой в тот момент, когда вы пытаетесь использовать планировщик.

Вот пример второго:

Observable.range(0, 5/*Integer.MAX_VALUE*/) // generates page values
    .subscribeOn(Schedulers.io())           // need this to prevent UI hanging
    // gamesService uses Schedulers.io() by default
    .flatMapSingle { page -> gamesService.getGames(page) }
    .takeWhile { games -> games.isNotEmpty() } // games is a List<Game>
    .subscribe(
        { games -> db.insertAll(games) },
        { Logger.e(TAG, it, "Error getting daily games: ${it.message}") }
    )

Что я ожидаю , это остановит моментчто gamesService.getGames(page) возвращает пустой список.Вместо этого он продолжает попадать в конечную точку в течение неопределенного количества раз с увеличением значений страницы.Я немного поэкспериментировал в модульных тестах с Single.just(intVal) и решил, что проблема заключается в том, что моя служба автоматически подписывается на Schedulers.io().Вот как я определяю свои сервисы Retrofit:

private inline fun <reified T> createService(okClient: OkHttpClient): T {
    val rxAdapter = RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io())
    val retrofit = Retrofit.Builder()
        .baseUrl(config.apiEndpoint.endpoint())
        .client(okClient)
        .addCallAdapterFactory(rxAdapter)
        .addConverterFactory(moshiConverterFactory())
        .build()

    return retrofit.create(T::class.java)
}

Это действительно не вариант , а не использование createWithScheduler() здесь.

Вот еще одна идея, которую я попробовал:

val atomic = AtomicInteger(0)
Observable.generate<Int> { it.onNext(atomic.getAndIncrement()) }
    .subscribeOn(Schedulers.io())
    .flatMapSingle { page -> gamesService.getGames(page) }
    .takeWhile { games -> games.isNotEmpty() }
    .subscribe(
        { games -> dailyGamesDao.insertAll(games) },
        { Logger.e(TAG, it, "Error getting daily games: ${it.message}") }
    )

Это еще один случай, когда он работал, как ожидалось, вплоть до того, как я ввел Scheduler.Генератор генерирует way слишком много значений, когда я ожидаю, что он остановится, когда takeWhile обнаружит пустой список.

Я также пробовал различные виды concat (concatWith, concatMap и т. д.).

На данный момент я просто ищу кого-то, кто помог бы мне исправить очевидное (для них) и совершенно базовое недоразумение, которое у меня явно есть с операторами RxJava.

1 Ответ

0 голосов
/ 29 мая 2018

Я нашел частичное решение.(Я могу отредактировать этот ответ позже, если и когда найду свое «окончательное» решение.)

tl; dr Я должен преобразовать свои Single s в Observable s и использоватьПерегрузка flatMap, которая принимает параметр maxConcurrency.Например:

Observable.range(0, SOME_SUFFICIENTLY_LARGE_NUMBER)
    .subscribeOn(Schedulers.io())
    .flatMap({ page -> gamesService.getGames(page).toObservable }, 1 /* maxConcurrency */)
    .takeWhile { games -> games.isNotEmpty() }
    .subscribe(
        { games -> dailyGamesDao.insertAll(games) },
        { Logger.e(TAG, it, "Error getting daily games: ${it.message}") }
    )

Это в основном делает это.Ограничив число одновременных потоков до 1, я теперь получаю «одно за другим» поведение, которое искал.Единственное, что мне не нравится в этом, и я предполагаю, что это небольшая неприятность, это то, что моя база Observable.range() все еще может излучать лот значений - гораздо больше, чем когда-либо привыкшие к нижестоящимSingle s / Observable s.

PS: Одна из причин, по которой я не смог найти это решение раньше, - это использование RxJava 2.1.9.Когда я подтолкнул его к 2.1.14, у меня был доступ к новым перегрузкам.Ну хорошо.

...