Как обрабатывать распоряжение в RxJava без InterruptedException - PullRequest
0 голосов
/ 30 мая 2019

В приведенном ниже коде, отсекаемом при вызове dispose(), поток эмиттера прерывается (InterruptedException выбрасывается из режима ожидания).

    Observable<Integer> obs = Observable.create(emitter -> {
        for (int i = 0; i < 10; i++) {
            if (emitter.isDisposed()) {
                System.out.println("> exiting.");
                emitter.onComplete();
                return;
            }

            emitter.onNext(i);
            System.out.println("> calculation = " + i);


            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        emitter.onComplete();
    });

    Disposable disposable = obs
            .subscribeOn(Schedulers.computation())
            .subscribe(System.out::println);

    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    disposable.dispose();

Из сеанса отладки я вижу, что прерывание происходит от FutureTask, который отменяется во время удаления. Там поток, вызывающий dispose(), проверяется на поток бегуна, и, если он не совпадает, эмиттер прерывается. Поток отличается, так как я использовал вычисления Scheduler.

Есть ли способ заставить распоряжение не прерывать такой излучатель, или это так, как это всегда должно быть обработано? Проблема, с которой я сталкиваюсь при таком подходе, заключается в том, что у меня была бы прерываемая операция (смоделированная здесь сном), которую я хотел бы выполнить нормально перед вызовом onComplete().

1 Ответ

2 голосов
/ 30 мая 2019

Пожалуйста, обратитесь к Что отличается от 2.0 - Обработка ошибок .

Одним из важных требований к дизайну для 2.x является отсутствие ошибок Throwable. Это означает, что ошибки не могут быть отправлены, потому что жизненный цикл нисходящего потока уже достиг своего состояния терминала или нисходящий поток отменил последовательность, которая должна была выдать ошибку.

Так что вы можете либо обернуть все внутри try / catch и правильно выдать ошибку:

Observable<Integer> obs = Observable.create(emitter -> {
   try {
      // ...
   } catch (InterruptedException ex) {
      // check if the interrupt is due to cancellation
      // if so, no need to signal the InterruptedException
      if (!disposable.isDisposed()) {
         observer.onError(ex);
      }
   }
});

или настройте потребителя глобальной ошибки, чтобы игнорировать его:

RxJavaPlugins.setErrorHandler(e -> {
    // ..
    if (e instanceof InterruptedException) {
        // fine, some blocking code was interrupted by a dispose call
        return;
    }
    // ...
    Log.warning("Undeliverable exception received, not sure what to do", e);
});
...