Оператор ZIP не работает с PublishSubject, что я делаю не так? - PullRequest
0 голосов

Я новичок в RxJava и не могу понять - почему моя "заархивированная" наблюдаемая не генерирует элементы, когда я использую два PublishSubject с ней?(Насколько я знаю, оператор ZIP должен «объединить» два потока в один)

val currentSubject = PublishSubject.create<Int>()
val maxSubject = PublishSubject.create<Int>()

currentSubject.onNext(1)
maxSubject.onNext(2)

currentSubject.onNext(1)
maxSubject.onNext(2)

Log.d("custom", "BINGO!")

val zipped = Observables.zip(currentSubject, maxSubject) { current, max -> "current : $current, max : $max " }
zipped.subscribe(
    { Log.d("custom", it) },
    { Log.d("custom", "BONGO!") },
    { Log.d("custom", "KONGO!") }
)

currentSubject.onComplete()
maxSubject.onComplete()

Я ожидаю, что элементы отображаются в функции "{Log.d (" custom ", it)}", но этого не происходит.Что я делаю не так?

Журнал после компиляции:

2019-06-25 22: 25: 36.802 3631-3631 / ru.grigoryev.rxjavatestdeleteafter D / custom: BINGO!

2019-06-25 22: 25: 36.873 3631-3631 / ru.grigoryev.rxjatest, а после D / custom: KONGO!

1 Ответ

1 голос
/ 25 июня 2019

Проблема здесь не в вашей реализации zip, а в поведении по умолчанию PublishSubject. Но сначала давайте вернемся

Горячие и холодные наблюдаемые

В Rx существует два типа Obervables, hot и cold. Наиболее распространенный тип - cold наблюдаемый. cold obervable не начнет излучать значения, пока не будет вызван .subscribe().

val obs = Observable.fromIterable(listOf(1, 2, 3, 4);
obs.subscribe { print(it) }
// Prints 1, 2, 3, 4

A hot observable будет выдавать значения независимо от того, подписан ли на него наблюдатель.

val subject = PublishSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe { print(it) }
subject.onNext(3)
subject.onNext(4)

// Prints 3, 4

Обратите внимание, как 1 и 2 там, где не напечатано. Это связано с тем, что PublishSubject является hot наблюдаемым и испускает 1 и 2 перед тем, как на него подписаться.

Вернуться к вашему вопросу

В вашем примере ваши темы публикации излучают 1 и 2 до , на которые они подписаны. Чтобы увидеть их zipped вместе, переместите свой код.

val currentSubject = PublishSubject.create<Int>()
val maxSubject = PublishSubject.create<Int>()

Log.d("custom", "BINGO!")

val zipped = Observables.zip(currentSubject, maxSubject) { current, max -> "current : $current, max : $max " }
zipped.subscribe(
    { Log.d("custom", it) },
    { Log.d("custom", "BONGO!") },
    { Log.d("custom", "KONGO!") }
)

currentSubject.onNext(1)
maxSubject.onNext(2)

currentSubject.onNext(1)
maxSubject.onNext(2)


currentSubject.onComplete()
maxSubject.onComplete()
...