Связывание нескольких потоков с помощью rxjava2 - PullRequest
0 голосов
/ 20 июня 2019

Я пытаюсь написать код службы вызовов API RxJava2.

У меня есть несколько API (включая API вызова цикла). Вот мой пример кода, который я хочу.

У меня вопрос, почему onComplete не звонили?


Observable.just("test")
                .flatMapCompletable(new Function<String, CompletableSource>() {
                    @Override
                    public CompletableSource apply(String s) throws Exception {
                        ArrayList<Observable<Void>> arrayList = new ArrayList<>();
                        arrayList.add(Completable.complete().delay(3, TimeUnit.SECONDS).andThen(new Observable<Void>() {
                            @Override
                            protected void subscribeActual(Observer<? super Void> observer) {
                                Log.e("subscribeActual", "onComplete1");
                                observer.onComplete();
                            }
                        }));
                        arrayList.add(Completable.complete().delay(1, TimeUnit.SECONDS).andThen(new Observable<Void>() {
                            @Override
                            protected void subscribeActual(Observer<? super Void> observer) {
                                Log.e("subscribeActual", "onComplete2");
                                observer.onComplete();
                            }
                        }));
                        arrayList.add(Completable.complete().delay(2, TimeUnit.SECONDS).andThen(new Observable<Void>() {
                            @Override
                            protected void subscribeActual(Observer<? super Void> observer) {
                                Log.e("subscribeActual", "onComplete3");
                                observer.onComplete();
                            }
                        }));
                        return Observable.merge(arrayList)
                                .toList()
                                .flatMapCompletable(new Function<List<Void>, CompletableSource>() {
                                    @Override
                                    public CompletableSource apply(List<Void> voids) throws Exception {
                                        return Completable.complete();
                                    }
                                });
                    }
                })
                .andThen(new Observable<Void>() {
                    @Override
                    protected void subscribeActual(Observer<? super Void> observer) {
                        Log.e("subscribeActual", "subscribeActual");
                        observer.onNext(null);
                    }
                })
                .flatMap(new Function<Void, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Void aVoid) throws Exception {
                        Log.e("ObservableSource", "apply");
                        return Observable.just("Hello");
                    }
                })
                .flatMapCompletable(new Function<String, CompletableSource>() {
                    @Override
                    public CompletableSource apply(String s) throws Exception {
                        Log.e("apply", s);
                        return Completable.complete();
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new CompletableObserver() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onComplete() {
                        Log.e("onComplete", "onComplete");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("onError", "onError " + e.getMessage());
                    }
                });

И результат из журнала

subscribeActual: onComplete2

подписаться Actual: onComplete3

subscribeActual: onComplete1

subscribeActual: subscribeActual

ObservableSource: применимо

применяется: привет

Я ожидаю, что цепочка наблюдаемых завершится с "Void" перед подпиской (Завершено или Observable<Void>).

...