Не могу использовать один и тот же DisposableObserver с подпиской дважды с dispose () - PullRequest
0 голосов
/ 22 сентября 2019

Код (Kotlin) очень прост, но я не понимаю, почему я не могу использовать тот же объект для повторной подписки?

val x = Observable.interval(1L, TimeUnit.SECONDS, Schedulers.io())
    .map {
        println("emitting=$it")
        it.toString()
    }.publish().autoConnect()

val o = object : DisposableObserver<String>() {
    override fun onComplete() {}
    override fun onNext(t: String) = println("O:=$t")
    override fun onError(e: Throwable) {}
}
println("---------- subscribe ----------")
val s2 = x.subscribeWith(o)

sleepSeconds(2)
println("---------- dispose ----------")
s2.dispose()

sleepSeconds(2)
println("---------- subscribe again ----------")
x.subscribeWith(o) //<<-- This doesn't work!!!!

sleepSeconds(5)

Вывод на консоль:

---------- subscribe ----------
emitting=0
O:=0
emitting=1
O:=1
---------- dispose ----------
emitting=2
emitting=3
---------- subscribe again ----------
emitting=4
emitting=5
etc.....

Хорошо работает, когда я создаю новый экземпляр класса DisposableObserver.

1 Ответ

1 голос
/ 22 сентября 2019

DisposableObserver предназначен для подписки только один раз javadoc .Когда вы заглянете под капот DisposableObserver, вы увидите AtomicReference upstream, который сохраняет текущий доступный ресурс при подписке наблюдателя.

public abstract class DisposableObserver<T> implements Observer<T>, Disposable {

    final AtomicReference<Disposable> upstream = new AtomicReference<Disposable>();

    @Override
    public final void onSubscribe(@NonNull Disposable d) {
        if (EndConsumerHelper.setOnce(this.upstream, d, getClass())) {
            onStart();
        }
    }
// rest of code

EndConsumerHelper.setOnce убедитесь, что DisposableObserver подписался только один раз.Если upstream был удален, то другие upstream не могут быть установлены.

public static boolean setOnce(AtomicReference<Disposable> upstream, Disposable next, Class<?> observer) {
        ObjectHelper.requireNonNull(next, "next is null");
        if (!upstream.compareAndSet(null, next)) { 
            next.dispose();  // dispose next if there is set upstream previously 
            if (upstream.get() != DisposableHelper.DISPOSED) {
                reportDoubleSubscription(observer);
            }
            return false;
        }
        return true;  
    }

Вот почему вы не можете повторно подписаться с тем же экземпляром DisposableObserver, но будете работать с новым экземпляром.

...