объединениеПоследняя альтернатива для объединения или объединения наблюдаемых - PullRequest
0 голосов
/ 02 июля 2018

У меня сложилось впечатление, что combineLast вначале подходил, но когда я читаю документы, кажется, что это не так: «Имейте в виду, что combineLatest не будет выдавать начальное значение, пока каждое наблюдаемое не выдаст по крайней мере одно значение ».… И, конечно, я попал именно в это исключение. Я пробовал forkJoin и merge в различных комбинациях, но я не могу понять это правильно.

Вариант использования довольно прост, метод someObs возвращает 0 или более наблюдаемых, по которым я зацикливаюсь. Основываясь на значении объекта SomeObs, я помещаю вновь созданную наблюдаемую OtherObs[] в Array из Observable<OtherObs[]>. Этот массив «нужно» объединить в одну наблюдаемую, и прежде чем возвращать его, я бы хотел сделать несколько преобразований.

В частности, мне трудно заменить оператор combineLast на что-то более подходящее ...

public obs(start: string, end: string): Observable<Array<OtherObs>> {
  return this.someObs().pipe(
    mergeMap((someObs: SomeObs[]) => {
      let othObs: Array<Observable<OtherObs[]>> = [];

      someObs.forEach((sobs: SomeObs) => {
        othObs.push(this.yetAnotherObs(sobs));
      });

      return combineLatest<Event[]>(othObs).pipe(
        map(arr => arr.reduce((acc, cur) => acc.concat(cur)))
      );
    })
  );
}

private yetAnotherObs(): Observable<OtherObs[]> {
  /// ...
}

private somObs(): Observable<SomeObs[]> {
  /// ...
}

1 Ответ

0 голосов
/ 05 ноября 2018

«Проблема» combineLatest заключается (как вы сказали) в том, что он «не будет излучать начальное значение, пока каждое наблюдаемое не выдаст хотя бы одно значение». Но это не проблема, потому что вы можете использовать оператор RxJS startWith .

Итак, ваша наблюдаемая получает начальное значение, а combineLatest работает как шарм;)

import { of, combineLatest } from 'rxjs';
import { delay, map, startWith } from 'rxjs/operators';

// Delay to show that this observable needs some time
const delayedObservable$ = of([10, 9, 8]).pipe(delay(5000));

// First Observable
const observable1$ = of([1, 2, 3]);

// Second observable that starts with empty array if no data
const observable2$ = delayedObservable$.pipe(startWith([]));

// Combine observable1$ and observable2$
combineLatest(observable1$, observable2$)
  .pipe(
    map(([one, two]) => {
      // Because we start with an empty array, we already get data from the beginning
      // After 5 seconds we also get data from observable2$
      console.log('observable1$', one);
      console.log('observable2$', two);

      // Concat only to show that we map boths arrays to one array
      return one.concat(two);
    })
  )
  .subscribe(data => {
    // After 0 seconds: [ 1, 2, 3 ]
    // After 5 seconds: [ 1, 2, 3, 10, 9, 8 ]
    console.log(data);
  });

См. Пример по Stackblitz

...