RXJS Объединение нескольких наблюдаемых внутри трубы - PullRequest
2 голосов
/ 17 июня 2019

У меня есть вызов API, который возвращает определенное количество идентификаторов.Каждый из этих идентификаторов используется для создания нового вызова API.Результаты этих вызовов API нужно объединить в один объект.

Сначала я использовал цикл внутри оператора .pipe (map) первого вызова API.В этом цикле я делал вторые вызовы API, и в операторе .pipe (map) в каждом из этих вызовов я редактировал переменную в своем угловом компоненте.

Это было не очень красиво, и я былна самом деле интересно, если это потокобезопасный.Я знаю, что javascript является однопоточным, но кажется не очень безопасным иметь несколько асинхронных процессов, связанных с одной и той же глобальной переменной.

, после чего я просто сохранил наблюдаемую информацию, возвращаемую вторым вызовом API в массивеперебирая возвращенные идентификаторы с помощью apiCall1, и использовал forkJoin для подписки и обработки каждого результата соответствующим образом (см. пример).

Однако это не очень красиво, и мне было интересно, есть ли оператор, который я могу использовать в моемтруба для этого?

Так что вместо (псевдокод):

  .pipe(
      map(ids=> {

        let observables = []
        for (const id of ids) {
         observables.push(this.service.getSomeStuff(id));
        }

        forkJoin(...observables).subscribe(dataArray) => {
          for (data of dataArray) {
            //Do something
          }
        });

      }),
      takeWhile(() => this.componentActive),
      catchError(error => {
        console.log(error);
        return throwError(error);
      })
    )
    .subscribe();

Есть ли оператор, который делает это примерно так:

  .pipe(
      map(ids=> {

        let observables = []
        for (const id of ids) {
         observables.push(this.service.getSomeStuff(id));
        }

      return observables
      }),
      forkJoin(dataArray => {
          for (data of dataArray) {
            //Do something
          }
        });
      takeWhile(() => this.componentActive),
      catchError(error => {
        console.log(error);
        return throwError(error);
      })
    )
    .subscribe();

Ответы [ 3 ]

2 голосов
/ 17 июня 2019

Вот что вы можете сделать:

sourceObservable$.pipe(
  // depends on your need here you can use mergeMap as well
  switchMap(ids => {
    const observables = ids.map(i => this.service.getSomeStuff(id));
    return forkJoin(observables);
  }),
  tap(joined => {
    // joined will be an array of values of the observables in the same
    // order as you pushed in forkJoin
    for (data of joined) {
      // do something
    }
  }),
  takeWhile(() => this.componentActive),
  catchError(error => {
    console.log(error);
    return throwError(error);
  })
)
.subscribe();
0 голосов
/ 18 июня 2019
combineLatest(...observables)

будет испускаться только после того, как все наблюдаемые испущены, и вы получите массив результатов.

0 голосов
/ 18 июня 2019

Вместо того, чтобы изменять ваши данные сразу, используя for, почему бы вам не сделать это по мере получения данных? Как то так.

source$.pipe(
  mergeMap(ids => from(ids)),
  mergeMap(id => this.service.getSomeStuff(id)),
  tap(data => //do someting with the data);
  takeWhile(() => this.componentActive),
  catchError(error => {
    console.log(error);
    return throwError(error);
  }),
  toArray(), --> this create the array of all the values received.
)
.subscribe(data => //returns array of modified data);
...