RxJava - как остановить публикацию PublishSubject, даже если вызывается onNext () - PullRequest
0 голосов
/ 25 января 2019

У меня есть взгляд, в котором я звоню следующее:

// class member
var myPublishSubject = PublishSubject.create<SomeObservable>()
// later on in the class somewhere:    
while(true){
   myPublishSubject.onNext(someObservable)
}

Я бы хотел остановить эмиссию, но цикл while продолжится вечно. Поэтому я хочу, чтобы вызов onNext ничего не делал. Но я боюсь, что если я вызову myPublishSubject.onComplete (), то в конечном итоге тема будет нулевой, и я получу NPE. Можно ли просто замолчать, даже если onNext () неоднократно вызывается. Это лучший способ просто отписаться?

1 Ответ

0 голосов
/ 25 января 2019

Несколько заметок

Это довольно редкий случай, но если вы покажете нам свое истинное намерение с помощью Observable, мы можем помочь вам в его разработке, еслине лучше, лучше.

Что вы можете сделать

В моих примерах я использовал только переменную flag, которая довольно проста, ее можно изменить при любом триггереу вас есть для вашего проекта.

Вариант 1

Вы можете напрямую вызвать onComplete на тему издателя

var maxEmittedItemCount = 10
var currentEmittedItemCount = 0
var someStringValue = "Some observable" // Create whatever observable you have
var publishSubject = PublishSubject.create<String>()

publishSubject.subscribe({
    currentEmittedItemCount++
    System.out.println(it)
}, {
    System.out.println(it)
})

while (true) {
    // Publish value on the subject
    publishSubject.onNext(someStringValue)

    // Test flag for trigger
    if (currentEmittedItemCount == maxEmittedItemCount) publishSubject.onComplete()

    // Print indication that the loop is still running
    System.out.println("Still looping")
}

Option2

Вы также можете удерживать ссылку на подписку, а затем утилизировать ее, это немного более семантически, чем предыдущая, поскольку она выполнит кодовый блок без вызова onNext(t), когда ресурсутилизируется.

lateinit var disposable: Disposable // Will hold reference to the subscription
var maxEmittedItemCount = 10
var currentEmittedItemCount = 0
var someStringValue = "Some observable" // Create whatever observable you have
var publishSubject = PublishSubject.create<String>()

disposable = publishSubject.subscribeWith(object : DisposableObserver<String>() {
    override fun onComplete() {
        // Print indication of completion for the subject publisher
        System.out.println("Complete")
    }

    override fun onNext(t: String) {
        // Test flag count synchonizer
        currentEmittedItemCount++

        // Print out current emitted item count
        System.out.println(currentEmittedItemCount)

        // Print current string
        System.out.println(t)
    }

    override fun onError(e: Throwable) {
        // Print error
        System.out.println(e)
    }
})

while (true) {
    // Publish value on the subject
    if (!disposable.isDisposed) publishSubject.onNext(someStringValue)

    // Test flag for trigger
    if (currentEmittedItemCount == maxEmittedItemCount) {
        publishSubject.onComplete() // optional if you need to invoke `onComplete()` block on the subject
        disposable.dispose()
    }

    // Print indication that the loop is still running
    System.out.println("Still looping")
}

Подробнее о

...