У меня есть такой код:
Процесс X:
getLocationObservable() // ---> async operation that fetches the location.
// Once location is found(or failed to find) it sends it to this filter :
.filter(location -> {
--- Operation A ---
after finishing the operation A, I either return 'true' and continue
to the next observable which is a Retrofit server call, or simply
return 'false' and quit.
})
.flatMap(location -> getRetrofitServerCallObservable( location )
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
new Observer<MyCustomResponse>() {
@Override
public void onSubscribe(Disposable d) {
_disposable = d;
}
@Override
public void onNext(MyCustomResponse response) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
В классе Location это выглядит так:
private PublishSubject<Location> locationPublishSubject = PublishSubject.create();
public Observable<Location> getLocationObservable() {
return locationPublishSubject;
}
and then...
locationPublishSubject.onNext( foundLocation );
Проблемы сPublishSubject:
- Если я звоню
locationPublishSubject.onError( new <some mock exception> )
->, он падает с io.reactivex.exceptions.UndeliverableException
- Если я звоню
locationPublishSubject.onComplete
после onNext
-> onNext
не бывает.Он прыгает прямо на onComplete
.