Несколько заметок
Это довольно редкий случай, но если вы покажете нам свое истинное намерение с помощью Observable
, мы можем помочь вам в его разработке, еслине лучше, лучше.
Что вы можете сделать
В моих примерах я использовал только переменную flag, которая довольно проста, ее можно изменить при любом триггереу вас есть для вашего проекта.
Вариант 1
Вы можете напрямую вызвать onComplete
на тему издателя
var maxEmittedItemCount = 10
var currentEmittedItemCount = 0
var someStringValue = "Some observable" // Create whatever observable you have
var publishSubject = PublishSubject.create<String>()
publishSubject.subscribe({
currentEmittedItemCount++
System.out.println(it)
}, {
System.out.println(it)
})
while (true) {
// Publish value on the subject
publishSubject.onNext(someStringValue)
// Test flag for trigger
if (currentEmittedItemCount == maxEmittedItemCount) publishSubject.onComplete()
// Print indication that the loop is still running
System.out.println("Still looping")
}
Option2
Вы также можете удерживать ссылку на подписку, а затем утилизировать ее, это немного более семантически, чем предыдущая, поскольку она выполнит кодовый блок без вызова onNext(t)
, когда ресурсутилизируется.
lateinit var disposable: Disposable // Will hold reference to the subscription
var maxEmittedItemCount = 10
var currentEmittedItemCount = 0
var someStringValue = "Some observable" // Create whatever observable you have
var publishSubject = PublishSubject.create<String>()
disposable = publishSubject.subscribeWith(object : DisposableObserver<String>() {
override fun onComplete() {
// Print indication of completion for the subject publisher
System.out.println("Complete")
}
override fun onNext(t: String) {
// Test flag count synchonizer
currentEmittedItemCount++
// Print out current emitted item count
System.out.println(currentEmittedItemCount)
// Print current string
System.out.println(t)
}
override fun onError(e: Throwable) {
// Print error
System.out.println(e)
}
})
while (true) {
// Publish value on the subject
if (!disposable.isDisposed) publishSubject.onNext(someStringValue)
// Test flag for trigger
if (currentEmittedItemCount == maxEmittedItemCount) {
publishSubject.onComplete() // optional if you need to invoke `onComplete()` block on the subject
disposable.dispose()
}
// Print indication that the loop is still running
System.out.println("Still looping")
}
Подробнее о