Подписчик onNext вызывается до завершения асинхронных запросов в rxjava2. - PullRequest
0 голосов
/ 07 сентября 2018

Я реализовал шаблон хранилища в MVP, используя RxJava2

RemoteDataSource.java

public Observable<List<A>> getAList(){
          return ApiService.
                getAList()
                .compose(RxUtils.applySchedulers())
                .doOnSubscribe(disposable -> Timber.d(..))
                .doOnError(throwable -> Timber.d(..))
                .doOnComplete(() -> {
                    Timber.d(..);
                });
      }

LocalDataSource.java

public Observable<List<A>> getAList(){
    return mDbHelper .....from SQLBrite..
}

public void saveAList(List<<A> a){
    SQlBriteTransaction...
}

Repository.java (обновление)

  @Inject
  public Repository(DownloadUtils downloadUtils){
   this.mDownloadUtils = downloadUtils;
   }

   @Override
   public Observable<List<A>> getAList(){
     return  mRemoteDataSource
            .getAList()
             .flatMapIterable(List<A> -> a)
            .flatMap(A a ->
      ************************************************************   
               return Observable.fromIterable(a.getB())
                     .flatMap((Function<B, ObservableSource<B>>) b ->
                                       Observable.create(emitter -> 
                                                emitter.onNext(new 
            DownloadUtils().downloadFiles(b,totalListCount,emitter))))
                                .toList()
                                .toObservable()

     ***************************************************
                    .toList()
                    .toObservable()
                    .doOnNext( List<A>  a -> {

     --------------Only the first change in B value is inserted in Db-
                       mLocalDataSource.saveAList(a);
                    });
    }

DownLoadUtils.java (обновление)

void downloadBFiles(B b, int totalCount,ObservableEmitter<B> emitter){
        fileCount = b.size;
         b.get(index).setDataToChange(dataToChange);

         *** I am using PR Downloader for aynchronous download using 
            RECURSION **
        PRDownloader.download(remoteUrl, filePath, fileName)
                .build()
        .setOnStartOrResumeListener(() -> {
            })
            .setOnProgressListener(progress -> {
                int progressPercent = (int) (progress.currentBytes * 
                   100 / progress.totalBytes);,

             })
            .start(new OnDownloadListener() {
                @Override
                public void onDownloadComplete() {
  ********************* emitter.onComplete() ******************


             @Override
                public void onError(Error error) {
                  }   
     }

Presenter.java

void getVideosFromRepo(){

    disposable = mRepository
                 .getAList()
                 .doOnSubscribe(d _-> "Started Loading")
                 .subscribe(
                   //OnNext
    ------------- Here the OnNext is being called before Asynchronous Operation completes!!-------

                    List<A> a -> mView.setAList(a);
                   )


}

Над реализацией презентатора возвращает список в onNext Presenter даже до завершения асинхронной загрузки ... какие изменения необходимы, так что onNext (подписка) вызывается после все загрузки завершены . !!!

1 Ответ

0 голосов
/ 07 сентября 2018

Вы используете асинхронные службы вне цепочки наблюдателей RxJava, поэтому RxJava не имеет возможности управлять передаваемыми данными. Поскольку downloadBFiles() использует отдельную цепочку наблюдателей, вы, так сказать, потеряли нить.

Вместо того, чтобы использовать doOnNext() для запуска загрузки, вам нужно будет использовать flatMap(), чтобы результат загрузки был включен в цепочку наблюдателей.

...