Наблюдаемая функция merge () определяет, какая наблюдаемая активирована - PullRequest
0 голосов
/ 04 июля 2018

Я создаю список Observable, используя список значений, для каждого значения пользовательский Observable. Я запускаю их все с помощью слияния, но не могу определить, какой из них запускает onNext() или onError()

Как в коде ниже:

 List<Observable<MyHttpRsObj>> observables = new ArrayList<>();

    for (String param : paramsList) {
        Observable<MyHttpRsObj> objObservable = MyRestClient.get().doHttpRequest(param);
        observables.add(fileUploadObservable);
    }

    Observable<BaseRs> combinedObservables = Observable.merge(observables);

    combinedObservables.observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe(new Subscriber<MyHttpRsObj>() {
                @Override
                public void onCompleted() {
                    //called only once when all Observables finished
                }

                @Override
                public void onError(Throwable throwable) {
                    //how to know which Observable has error (which param)
                }


                @Override
                public void onNext(MyHttpRsObj myHttpRsObj) {
                    //how to know which Observable has sccess  (which param)
                }
            });

1 Ответ

0 голосов
/ 04 июля 2018

Невозможно узнать, какая из наблюдаемых вызвала ошибку, поскольку вы объединяете все наблюдаемые в одну.

Лучше всего использовать одного наблюдателя для каждого наблюдаемого. И последний для объединенного Observable.

Как это:

 List<Observable<MyHttpRsObj>> observables = new ArrayList<>();

    for (String param : paramsList) {
        //change to connectable Observable
        ConnectableObservable<MyHttpRsObj> objObservable = MyRestClient.get()
                 .doHttpRequest(param)
                 .publish();

       //don't forget to connect
        observable.connect();
        observables.add(fileUploadObservable);

        //subscribe for each observable
        objObservable.observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(new Subscriber<MyHttpRsObj>() {
                    @Override
                    public void onCompleted() {
                        //just partial completed
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        //you can access param from here

                    }


                    @Override
                    public void onNext(MyHttpRsObj myHttpRsObj) {
                        //access onNext here
                        //you can access param from here
                    }
                });
    }

    Observable<BaseRs> combinedObservables = Observable.merge(observables);

    combinedObservables.observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe(new Subscriber<MyHttpRsObj>() {
                @Override
                public void onCompleted() {
                    //called only once when all Observables finished
                }

                @Override
                public void onError(Throwable throwable) {
                    //don't handle error here
                }


                @Override
                public void onNext(MyHttpRsObj myHttpRsObj) {

                }
            });

PS : используйте ConnectableObservable , чтобы избежать излучения дважды

...