Как использовать concat с lambda и ObservableSource - PullRequest
4 голосов
/ 20 марта 2020
RxJava2
kotlin 

Это работает нормально, и я могу объединить 2 наблюдаемые

   Observable.concat(countries(), animals())
                    .subscribeBy {
                        println(it)
                    }

Этот пример, который я не могу понять, поскольку он использует лямбду, которая, кажется, принимает ObservableSource, и я хочу объединить 2 наблюдаемые, но это приводит к нулевому исключению. Просто интересно, что я делаю не так с этим. И какова цель использования лямбды с конкатом?

        Observable.concat<String> {
                    it.onNext(countries())
                    it.onNext(animals())
                }.subscribeBy {
                    println(it)
                }


    private fun animals(): Observable<String> =
            Observable.just("fox", "cat", "dog", "bear", "bat", "hare", "lion", "tiger")

    private fun countries(): Observable<String> =
            Observable.just("England", "France", "Thailand", "America", "Scotland", "Ice Land")

Это кража sh Я получаю:

Exception in thread "main" java.lang.NullPointerException
    at io.reactivex.internal.operators.observable.ObservableConcatMap$SourceObserver.onNext(ObservableConcatMap.java:129)

Это интерфейс для ObservableSource Я думаю, что я имею в виду.

public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}

Большое спасибо за любые предложения

1 Ответ

5 голосов
/ 22 марта 2020

Передача лямбды в concat разрешается до concat(ObservableSource<? extends ObservableSource<? extends T>> sources). Поскольку ObservableSource является интерфейсом с одним методом, отличным от используемого по умолчанию, это вызывает Kotlin преобразование SAM . Вот почему он выбирает эту перегрузку - он единственный с интерфейсом, который может быть выполнен с помощью преобразования SAM.

Таким образом, лямбда является реализацией метода ObservableSource.subscribe(Observer<? super T> observer). Этот метод задокументирован как:

Подписывает данного Observer на этот экземпляр ObservableSource.

Таким образом, лямбда-выражение должно подписать параметр (it) на источник Observables. Вы получаете NullPointerException, потому что вместо подписки вы начинаете звонить onNext на Observer, который еще не подписан и, следовательно, имеет неправильное внутреннее состояние (в этом случае есть очередь, которая еще не настроена) , но это не особенно важно).

Чтобы выполнить контракт метода, вы просто должны создать Observable, который излучает Observable s, и подписаться it (Observer) на это Observable в лямбде, вот так:

Observable.concat<String> {
    Observable.just(countries(), animals()).subscribe(it)
}.subscribeBy {
    println(it)
}

Я проверил это локально, и он дает ожидаемый результат.

...