Rxjava: наблюдаем на логике удаления наблюдателя - PullRequest
0 голосов
/ 29 июня 2018

У меня есть следующий пример:

private var dis: Disposable? = null

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    setContentView(R.layout.activity_login)
    val btn = findViewById<Button>(R.id.btn)
    val btn2 = findViewById<Button>(R.id.btn2)

    btn.setOnClickListener {
        dis = Single.fromCallable {
            Thread.sleep(1000)
            15
        }
        .subscribeOn(Schedulers.io())
        //.observeOn(AndroidSchedulers.mainThread())
        .subscribe({ v ->
            log("Success: $v")
        }, { e ->
            log("Error: $e")
        })
    }

    btn2.setOnClickListener {
        dis?.dispose()
    }
}

fun log(msg: String) {
    Log.d("TAGG", msg + " on " + Thread.currentThread().name);
}

Когда строка ".observeOn (..)" ЗАМЕЧАНА, а я:

  1. пресс батн
  2. нажмите btn2 сразу после нажатия btn

Тогда будет вызван блок кода наблюдателя onError.

Когда строка ".observeOn (..)" ОБНОВЛЕНА, а я:

  1. пресс btn
  2. нажмите btn2 сразу после нажатия btn

Тогда блок кода наблюдателя onError НЕ будет вызван.

Я просто не могу понять такое поведение. В моем понимании эта строка кода не должна влиять на логику вызова onError или нет.

Почему это происходит? Это документированное поведение? Пожалуйста, дайте мне какое-нибудь объяснение или переадресацию.

Спасибо!

Ответы [ 2 ]

0 голосов
/ 02 июля 2018

Если у вас есть линия observeOn и утилизируется одноразовая, RxJava прервет поток перед попыткой доставить результат абоненту.

Вот ссылка на выпуск https://github.com/ReactiveX/RxJava/issues/4673

0 голосов
/ 02 июля 2018

При использовании юнит-теста я не воспроизвожу вашу проблему. С наблюдением или без него я наблюдаю исключение UndeliverableException, которое выдается RxJavaPlugins и вызывается doOnDispose.

Что ты видишь?

@Test
  public void testObserveOnEffect() throws Exception {
    Disposable dis = Single.fromCallable(() -> {
      Thread.sleep(1000);
      return 15;
    })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnDispose(() -> logThread("dispose"))
        .subscribe(v -> logThread(v), e -> logThread(e));

    Single.timer(50, TimeUnit.MILLISECONDS).subscribe(o -> dis.dispose());
    Thread.sleep(2000);
  }
...