Обычный метод onSubscribe наблюдателей можно вызвать столько же, сколько вы используете этого же наблюдателя. но с DisposableObserver
onSubscibe
метод может быть вызван только один раз, что означает, что вы можете использовать этот DisposableObserver
только один раз. если вы передадите один объект DisposableObserver
в два потока, он выдаст исключение и быстро закроет их обоих. эта логика была реализована в onSubscribe()
этого класса, поэтому вы не можете переопределить ее. но в случае, если вам нужен обратный вызов onSubscribe()
, вы можете переопределить метод onStart()
, который такой же.
Использование этого класса может быть следующим:
Согласно документации DisposableObserver:
Абстрактный Observer, который позволяет асинхронное отмену путем реализации Disposable.
другими словами, это означает, что вы можете использовать одноразовые поведения внутри ваших методов-наблюдателей. например, dispose()
в onNext()
.
Observable.just(1, 2, 3, 4, 5)
.map {
"$it"
}
.subscribe(object : DisposableObserver<String>(){
override fun onComplete() {
}
override fun onNext(t: String) {
println("first item only= $t")
//we can dispose this stream right in the observer methods
dispose()
}
override fun onError(e: Throwable) {
}
})
Можно даже объединить DisposableObserver
с subscribeWith()
, чтобы получить почти то же поведение, что и обычные наблюдатели.
val disposableObserver = object : DisposableObserver<String>() {
override fun onComplete() {
}
override fun onNext(t: String) {
println("first item only= $t")
//we can dispose this stream right in the observer methods
dispose()
}
override fun onError(e: Throwable) {
}
}
val disposable: Disposable = Observable.just(1, 2, 3, 4, 5)
.map {
"$it"
}
.subscribeWith(disposableObserver)
disposable.dispose()
Этои многие другие классы и операторы в RX-Java существуют для облегчения использования RX, и в зависимости от того, как вы хотите использовать библиотеку, может быть выбран любой из них.