RxJava 2.x - Observable, получающий flatMap, запускаемый PublishSubject и объединяемый с другими Observables, не подписывается / не выполняется - PullRequest
1 голос
/ 07 октября 2019

У меня есть решение для нумерации страниц, использующее PublishSubject, которое выглядит следующим образом:

    private val pages: PublishSubject<Int> = PublishSubject.create()
    val observable: Observable<List<Data> = pages.hide()
        .filter { !inFlight }
        .doOnNext { inFlight = true }
        .flatMap{
            getPage(it) // Returns an Observable
        }
        .doOnNext(::onNextPage) // inFlight gets reset here

Это Observable объединено и отсканировано с другими Observable ´ как это:

    fun stateObservable(): Observable<SavedState> {
        return Observable.merge(listOf(firstPage(),
            nextPage(),// The observable listed above
            refresh()))
            .scan(MyState.initialState(), StateReducer::reduce)
    }

По сути, у меня есть однонаправленная установка, где каждое наблюдаемое обновляет MyState с соответствующими изменениями с помощью функции аккумулятора reduce.

В ViewModel это расходуется в прямом направленииway:

        interactor.stateObservable()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeBy(onNext = ::render, onError = Timber::e)
            .addTo(subscriptions)

Эта настройка хорошо работает как для firstPage, так и для refresh (также запускается с помощью PublishSubject), но по какой-то причине решение по поисковому вызову аналогично возврату getPage Observable в flatMap, но тогда эта страница Observable никогда не будет запущена / подписана, и doOnNext после flatMap, очевидно, тоже не будет вызван. Кажется, что он в основном не хочет подписываться на него, и я просто не знаю, почему.

Функция getPage выглядит следующим образом:

    private fun getPage(page: Long): Observable<PartialState<SavedState>> {
        return repo.getPage(page).firstOrError().toObservable()
            .subscribeOn(Schedulers.io())
            .map<PartialState<MyState>> { NextPageLoaded(it) }
            .onErrorReturn { NextPageError(it) }
            .startWith { NextPageLoading() }
    }

getPage inрепозиторий конвертирует RxJava 1 Observable в RxJava2 Observable с помощью RxJavaInterop следующим образом:

    public io.reactivex.Observable<List<Data>> getPage(long page) {
        Observable<List<Data>> observable = getPage(page)
                .map(dataList -> {
                    if(dataList == null){
                        dataList = new ArrayList<>();
                    }
                    return dataList;
                });

        return RxJavaInterop.toV2Observable(observable);
    }

Я не получаю никаких ошибок, поэтому вы можете исключить это.

У меня уже есть та же самая установка с RxJava 1, где она работает очень хорошо, и теперь, когда я перехожу на 2.x, я ожидал, что то же решение будет работать, но я полностью застрял в этой проблеме разбивки на страницыи во всех других сценариях установка работает должным образом.

Чтобы проверить проблему, я загрузил пример проекта на GitHub , демонстрирующий проблему.

ЛюбойЭксперт RxJava, который знает, что это может быть? :)

Спасибо

1 Ответ

3 голосов
/ 11 октября 2019

Я обнаружил проблему: чрезмерное использование {} для создания лямбды для startWith, которая ничего не делает (и поэтому никогда не переключается на цепочку страниц) в nextPageObservable.

.startWith { NextPageLoading() }

Вместо этого:

.startWith ( NextPageLoading() )

Однако с этим изменением ваш код вызывает IAE из-за нулевого значения где-то еще:

java.lang.IllegalArgumentException: Parameter specified as non-null is null: method kotlin.jvm.internal.Intrinsics.checkParameterIsNotNull, parameter page
    at com.paginationissue.ui.list.PaginationIssueInteractor$1.invoke(Unknown Source:2)
    at com.paginationissue.ui.list.PaginationIssueInteractor$1.invoke(PaginationIssueInteractor.kt:15)
    at com.paginationissue.paging.PageNumberTokenStrategy.generateNextPageToken(PageNumberTokenStrategy.kt:21)
    at com.paginationissue.paging.PageNumberTokenStrategy.generateNextPageToken(PageNumberTokenStrategy.kt:12)
    at com.paginationissue.paging.Pager.onNextPage(Pager.kt:92)
...