Проблема здесь не в вашей реализации zip
, а в поведении по умолчанию PublishSubject
. Но сначала давайте вернемся
Горячие и холодные наблюдаемые
В Rx существует два типа Obervables
, hot
и cold
. Наиболее распространенный тип - cold
наблюдаемый. cold
obervable не начнет излучать значения, пока не будет вызван .subscribe()
.
val obs = Observable.fromIterable(listOf(1, 2, 3, 4);
obs.subscribe { print(it) }
// Prints 1, 2, 3, 4
A hot
observable будет выдавать значения независимо от того, подписан ли на него наблюдатель.
val subject = PublishSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe { print(it) }
subject.onNext(3)
subject.onNext(4)
// Prints 3, 4
Обратите внимание, как 1
и 2
там, где не напечатано. Это связано с тем, что PublishSubject
является hot
наблюдаемым и испускает 1
и 2
перед тем, как на него подписаться.
Вернуться к вашему вопросу
В вашем примере ваши темы публикации излучают 1 и 2 до , на которые они подписаны. Чтобы увидеть их zipped
вместе, переместите свой код.
val currentSubject = PublishSubject.create<Int>()
val maxSubject = PublishSubject.create<Int>()
Log.d("custom", "BINGO!")
val zipped = Observables.zip(currentSubject, maxSubject) { current, max -> "current : $current, max : $max " }
zipped.subscribe(
{ Log.d("custom", it) },
{ Log.d("custom", "BONGO!") },
{ Log.d("custom", "KONGO!") }
)
currentSubject.onNext(1)
maxSubject.onNext(2)
currentSubject.onNext(1)
maxSubject.onNext(2)
currentSubject.onComplete()
maxSubject.onComplete()