Я пытаюсь написать код службы вызовов API RxJava2.
У меня есть несколько API (включая API вызова цикла).
Вот мой пример кода, который я хочу.
У меня вопрос, почему onComplete
не звонили?
Observable.just("test")
.flatMapCompletable(new Function<String, CompletableSource>() {
@Override
public CompletableSource apply(String s) throws Exception {
ArrayList<Observable<Void>> arrayList = new ArrayList<>();
arrayList.add(Completable.complete().delay(3, TimeUnit.SECONDS).andThen(new Observable<Void>() {
@Override
protected void subscribeActual(Observer<? super Void> observer) {
Log.e("subscribeActual", "onComplete1");
observer.onComplete();
}
}));
arrayList.add(Completable.complete().delay(1, TimeUnit.SECONDS).andThen(new Observable<Void>() {
@Override
protected void subscribeActual(Observer<? super Void> observer) {
Log.e("subscribeActual", "onComplete2");
observer.onComplete();
}
}));
arrayList.add(Completable.complete().delay(2, TimeUnit.SECONDS).andThen(new Observable<Void>() {
@Override
protected void subscribeActual(Observer<? super Void> observer) {
Log.e("subscribeActual", "onComplete3");
observer.onComplete();
}
}));
return Observable.merge(arrayList)
.toList()
.flatMapCompletable(new Function<List<Void>, CompletableSource>() {
@Override
public CompletableSource apply(List<Void> voids) throws Exception {
return Completable.complete();
}
});
}
})
.andThen(new Observable<Void>() {
@Override
protected void subscribeActual(Observer<? super Void> observer) {
Log.e("subscribeActual", "subscribeActual");
observer.onNext(null);
}
})
.flatMap(new Function<Void, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Void aVoid) throws Exception {
Log.e("ObservableSource", "apply");
return Observable.just("Hello");
}
})
.flatMapCompletable(new Function<String, CompletableSource>() {
@Override
public CompletableSource apply(String s) throws Exception {
Log.e("apply", s);
return Completable.complete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onComplete() {
Log.e("onComplete", "onComplete");
}
@Override
public void onError(Throwable e) {
Log.e("onError", "onError " + e.getMessage());
}
});
И результат из журнала
subscribeActual: onComplete2
подписаться Actual: onComplete3
subscribeActual: onComplete1
subscribeActual: subscribeActual
ObservableSource: применимо
применяется: привет
Я ожидаю, что цепочка наблюдаемых завершится с "Void" перед подпиской (Завершено или Observable<Void>
).