Почему Observable.empty () не распространяется на onComplete () ниже по течению? - PullRequest
0 голосов
/ 26 августа 2018

Документация для RxJava Observable.empty() сообщает:

создать Observable, который не испускает никаких предметов, но обычно завершается

«Завершить в обычном режиме» означает вызов onComplete().

Так что в нижестоящем наблюдателе я ожидаю получить onComplete(). Но этого не происходит.

Есть идеи, почему? Означает ли "нормально завершается" что-то еще в этом контексте?

Вот пример кода:

Observable.just(2, 3, 0, 15, 12, 1)
            .flatMap(new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    if (integer == 0) {
                        return Observable.empty();
                    } else {
                        return Observable.just("Value: " + integer);
                    }
                }
            })
            .subscribe(observer);

observer не получает onComplete().

Ответы [ 2 ]

0 голосов
/ 26 августа 2018

с использованием типа Observable в качестве примера, что на самом деле flatMap делает:

  1. отображает каждый выброс из восходящего потока в Observable<T>,который эффективно преобразует весь поток в Observable<Observable<T>>.чтобы избавить вас от синтаксиса, похожего на «обратный вызов ада», он затем ...

  2. сглаживает этот поток просто Observable<T>

(я всегда думал, что по этой причине имя flatMap кажется обратным)

, поэтому, используя ваш пример ввода, операция map выдаст:

2  --> Observable(2)
3  --> Observable(3) 
0  --> Observable()
15 --> Observable(15)
12 --> Observable(12)
1  --> Observable(1)

в этой точке поток теперь Observable<Observable<Integer>>.

, впоследствии уплощение внутреннее Observable<Integer> излучение дает поток, который фактически совпадает с:

Observable(2, 3, 15, 12, 1)

... так как Observable.empty() ничего не производит.

когда я понял ваш вопрос, вы думали, что излучение 0 эффективно завершит весь поток, но это не так, как работает flatMap,однако, если вы ищете именно такое поведение, я думаю, вам подойдет какой-то вариант оператора switch*.

Надеюсь, это поможет вам в этом!

0 голосов
/ 26 августа 2018

Вероятно, существует другая проблема с вашим кодом, поскольку наблюдаемая пустая функция вызывает onComplete.

Вот простой тест для проверки этого (на kotlin):

class RxObservableTest {

    @Test
    fun checkObservable() {
        Observable.empty<Int>()
                .doOnComplete { println("Received OnComplete") }
                .test()
                .assertComplete()
    }
}

Вывод:

Connected to the target VM, address: '127.0.0.1:63929', transport: 'socket'
Received OnComplete
Disconnected from the target VM, address: '127.0.0.1:63929', transport: 'socket'

Process finished with exit code 0

Обновление

Итак, я думаю, я понимаю вашу проблему сейчас.

Вы ожидаете получать onCompleted после каждого Observable, но этоне так, как работает RxJava.Из документации наблюдателя :

и шаблон вызова должен соответствовать следующему протоколу:

onSubscribe onNext * (onError | onComplete)?

Это означает, что каждый Observer получит не более одного события onComplete и после этого ничего не получит.В этом случае он получен после того, как все Observable в flatMap завершены.В вашем случае последний выдает 1 и именно тот результат, который вы видите.

В случае, если вы хотите получать уведомления о завершении Observable.empty(), вы можете рассмотреть возможность предоставления более сложных данных, которые просто Integer

...