Как я понял из документации, использование singel.observeOn (Scheduler) гарантирует, что любое событие downstream будет выполнено в этом планировщике.
Очевидно, onError вызвал тот же планировщик, который выдал ошибку, когда я получил эту ошибку -
> Caused by:java.lang.IllegalStateException: Cannot invoke setValue on a background thread
at androidx.lifecycle.LiveData.assertMainThread(LiveData.java:443)
at androidx.lifecycle.LiveData.setValue(LiveData.java:286)
at androidx.lifecycle.MutableLiveData.setValue(MutableLiveData.java:33)
at com.bonimoo.womlauncher.presentation.wizard.registration.RegistrationViewModel$1.onError(RegistrationViewModel.java:89)
at io.reactivex.internal.operators.single.SingleFlatMap$SingleFlatMapCallback$FlatMapSingleObserver.onError(SingleFlatMap.java:116)
at io.reactivex.internal.observers.ResumeSingleObserver.onError(ResumeSingleObserver.java:51)
at io.reactivex.internal.disposables.EmptyDisposable.error(EmptyDisposable.java:78)
at io.reactivex.internal.operators.single.SingleError.subscribeActual(SingleError.java:42)
at io.reactivex.Single.subscribe(Single.java:3603)
из этого кода -
public static<T> SingleTransformer<T,T> getSingleTransformer(){
return upstream -> upstream
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
GetHotelsList getHotelsList = new GetHotelsList(
AsyncTransformers.getSingleTransformer(), networkRepo);
getHotelsList.getHotels()
.map(hotels->
CollectionsUtil.mapList(hotels,
RegistrationMappers::mapHotelToPresentationHotel)
)
.flatMap((Function<List<HotelPresentation>, SingleSource<List<HotelPresentation>>>) hotelPresentations ->
Completable.timer(5,TimeUnit.SECONDS, Schedulers.io())
.andThen(Single.error(new Throwable()))
)
.subscribe(new SingleObserver<List<HotelPresentation>>() {
@Override
public void onSubscribe(Disposable d) {
addDisposable(d);
RegistrationState currentState = stateLiveData.getValue();
stateLiveData.setValue(currentState.newBuilder().setLoadingHotels(true).build());
}
@Override
public void onSuccess(List<HotelPresentation> hotelPresentations) {
RegistrationState currentState = stateLiveData.getValue();
stateLiveData.setValue(currentState.newBuilder().setHotelsList(hotelPresentations).setLoadingHotels(false).build());
}
@Override
public void onError(Throwable e) {
RegistrationState currentState = stateLiveData.getValue();
stateLiveData.setValue(currentState.newBuilder().setLoadingHotels(false).build());
}
});
и отладчик показал, что onError вызвал RxSchedulerIoThread.