У меня есть решение для нумерации страниц, использующее PublishSubject
, которое выглядит следующим образом:
private val pages: PublishSubject<Int> = PublishSubject.create()
val observable: Observable<List<Data> = pages.hide()
.filter { !inFlight }
.doOnNext { inFlight = true }
.flatMap{
getPage(it) // Returns an Observable
}
.doOnNext(::onNextPage) // inFlight gets reset here
Это Observable
объединено и отсканировано с другими Observable
´ как это:
fun stateObservable(): Observable<SavedState> {
return Observable.merge(listOf(firstPage(),
nextPage(),// The observable listed above
refresh()))
.scan(MyState.initialState(), StateReducer::reduce)
}
По сути, у меня есть однонаправленная установка, где каждое наблюдаемое обновляет MyState
с соответствующими изменениями с помощью функции аккумулятора reduce
.
В ViewModel
это расходуется в прямом направленииway:
interactor.stateObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(onNext = ::render, onError = Timber::e)
.addTo(subscriptions)
Эта настройка хорошо работает как для firstPage
, так и для refresh
(также запускается с помощью PublishSubject
), но по какой-то причине решение по поисковому вызову аналогично возврату getPage
Observable
в flatMap
, но тогда эта страница Observable
никогда не будет запущена / подписана, и doOnNext
после flatMap
, очевидно, тоже не будет вызван. Кажется, что он в основном не хочет подписываться на него, и я просто не знаю, почему.
Функция getPage
выглядит следующим образом:
private fun getPage(page: Long): Observable<PartialState<SavedState>> {
return repo.getPage(page).firstOrError().toObservable()
.subscribeOn(Schedulers.io())
.map<PartialState<MyState>> { NextPageLoaded(it) }
.onErrorReturn { NextPageError(it) }
.startWith { NextPageLoading() }
}
getPage
inрепозиторий конвертирует RxJava 1 Observable
в RxJava2 Observable
с помощью RxJavaInterop
следующим образом:
public io.reactivex.Observable<List<Data>> getPage(long page) {
Observable<List<Data>> observable = getPage(page)
.map(dataList -> {
if(dataList == null){
dataList = new ArrayList<>();
}
return dataList;
});
return RxJavaInterop.toV2Observable(observable);
}
Я не получаю никаких ошибок, поэтому вы можете исключить это.
У меня уже есть та же самая установка с RxJava 1, где она работает очень хорошо, и теперь, когда я перехожу на 2.x, я ожидал, что то же решение будет работать, но я полностью застрял в этой проблеме разбивки на страницыи во всех других сценариях установка работает должным образом.
Чтобы проверить проблему, я загрузил пример проекта на GitHub , демонстрирующий проблему.
ЛюбойЭксперт RxJava, который знает, что это может быть? :)
Спасибо