В приведенном ниже коде, отсекаемом при вызове dispose()
, поток эмиттера прерывается (InterruptedException
выбрасывается из режима ожидания).
Observable<Integer> obs = Observable.create(emitter -> {
for (int i = 0; i < 10; i++) {
if (emitter.isDisposed()) {
System.out.println("> exiting.");
emitter.onComplete();
return;
}
emitter.onNext(i);
System.out.println("> calculation = " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
emitter.onComplete();
});
Disposable disposable = obs
.subscribeOn(Schedulers.computation())
.subscribe(System.out::println);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
disposable.dispose();
Из сеанса отладки я вижу, что прерывание происходит от FutureTask
, который отменяется во время удаления. Там поток, вызывающий dispose()
, проверяется на поток бегуна, и, если он не совпадает, эмиттер прерывается. Поток отличается, так как я использовал вычисления Scheduler
.
Есть ли способ заставить распоряжение не прерывать такой излучатель, или это так, как это всегда должно быть обработано? Проблема, с которой я сталкиваюсь при таком подходе, заключается в том, что у меня была бы прерываемая операция (смоделированная здесь сном), которую я хотел бы выполнить нормально перед вызовом onComplete()
.