Угловой 6: Как сделать набор вызовов службы параллельно и обрабатывать данные после того, как все они завершены - PullRequest
0 голосов
/ 02 ноября 2018

Я работаю над веб-приложением Angular 6+, используя rxjs 6+.

Есть набор сервисов (все разные и делают разные http звонки). В основном эти сервисы используются для инициализации приложения.

Мое требование - вызывать эти службы параллельно и ждать, пока все они не будут завершены. После завершения мне нужно обработать полученные ответы. Я много изучал в этой области и обнаружил, что forkJoin и Объединить последние могут быть моими друзьями. И я реализовал то же самое следующим образом:

 getInitialData() {

 this._authService.openCorsConnection().subscribe(res => { 
  forkJoin(
    this._authService.x(),
    this._tService.getSystemDate(),
    this._tService.getTemp(),
    this._tService.getSettings()
  ).pipe(
    catchError( err => {
      console.log('gor error',err);
      if (err.status == 401) {
          // this._router.navigateByUrl('/unauthorize');
          return EMPTY;
      } else {
          return throwError(err);
      }
 }),
    map(([x,y,z,p])=>{
           console.log('sucesss',{x,y,z,p});          
            return {x,y,z,p}
    }),
  finalize(()=>{
        console.log('Executing Finally');
        processData();

  })
  );
});
 }

Но это не исполнение. Вот услуги:

x(): Observable<any> {          
const xUrl  = EnvironmentConfiguration.endPointConfig.xServiceEndpoint;
          let serviceMethod: String = 'x';
          let requestObj: any = this._utilHelperSvc.getRequestObject(xUrl  , { "y": "" }, serviceMethod);
          return this._http.post<any>(requestObj.url,{ "pid": "" } , requestObj)
          .pipe(
            catchError(this.handleError('x', {}))
          );
      }

Нужно ли менять сервисы? Я не могу понять, как решить эту проблему.

Кто-нибудь может предложить, пожалуйста, лучшее решение или другой подход для этого?

Спасибо.

Ответы [ 2 ]

0 голосов
/ 02 ноября 2018

Я только что сделал несколько дней назад после лота поиска.

Главное, как вы называете список API.

Если вы вызываете API из component.ts , то вы получаете данные в этом. Суть в том, что когда вы вызываете API через службу, вы должны хранить ссылку вызова API.

Я сделал это таким образом ...

var pendingAPIList : any = [];

Вы можете добавить список API в массив с ref из наблюдаемого.

return new Observable(observer => {
    var obj = {
       url:url,
       param:param,
       ref : observer  // <-- Hear is the main trick 
    }
    this.pendingAPIList.push(obj);  // Add multiple API in array
})

Когда все API добавлены, тогда вызовите ...

Observable.forkJoin(this.pendingAPIList).subscribe(response){

     // You will get data in array 
     // Now return all API res in their subscribers at .ts file... 

     for(let i = 0 ; i < this.pendingAPIList.lengh ; i++  ){
        this.pendingAPIList[i].ref.next(response[i]);
        this.pendingAPIList[i].ref.complete();   
     }
}

У меня это нормально работает ...:)

0 голосов
/ 02 ноября 2018

TL; DR

Используйте zip или forkjoin .

Демоверсии:

Пояснение

Вам не нужно ждать, пока forkJoin обработает ответы, это работает наоборот. Вместо этого подготовьте все задания, которые необходимо выполнить, и просто дождитесь завершения.

Вот что я имею в виду:

let process1$ = timer(1000);
let process2$ = timer(2000);
let process3$ = timer(3000);

const doSthg1 = pipe(
  tap(() => console.log('Process 1 has finished')),
  map(() => 'Process 1 has finished')
)

const doSthg2 = pipe(
  tap(() => console.log('Process 2 has finished')),
  map(() => 'Process 2 has finished')
)

const doSthg3 = pipe(
  tap(() => console.log('Process 3 has finished')),
  map(() => 'Process 3 has finished')
)

forkJoin(doSthg1(process1$), doSthg2(process2$), doSthg3(process3$)).subscribe(() => {
  console.log('Now I am complete');
});

Демонстрация с форкомJoin

Это работает, пока ваши процессы не связаны друг с другом, т. Е. Ввод одного не зависит от вывода другого.

почему мой код не работает?

Потому что вы на самом деле не подписаны на forkJoin. Решение: например, вы можете использовать concatMap для преобразования this._authService.openCorsConnection() в другое наблюдаемое:

getInitialData(){
  this._authService.openCorsConnection().pipe(
    concatMap(() => forkJoin(this._authService.x(),
      this._tService.getSystemDate(),
      this._tService.getTemp(),
      this._tService.getSettings()
    )),
    catchError(err => {
      console.log('gor error', err);
      if (err.status == 401) {
        return EMPTY;
      } else {
        return throwError(err);
      }
    }),
  ).subscribe(() => processData());
}

застежка-молния

Вы можете использовать zip вместо forkJoin, если:

  1. все процессы излучают только один раз, а затем завершаются: например, запросы HTTP Get или Post.
  2. вы хотите получить выходы соединения из ваших источников.

реализация:

zip(doSthg1(process1$), doSthg2(process2$), doSthg3(process3$))
.subscribe(
      ([x,y,z]) => {
      console.log(`x=${x}, y=${y}, z=${z}`);
});

Демонстрация на молнии

...