Как объединить предметы и наблюдаемые с угловым rxjs - PullRequest
1 голос
/ 19 июня 2019

У меня есть две подписки:

this.service1.source1.subscribe(countries => {
  this.filteredData =  [];
  countries.forEach(country => {
    this.filteredData = this.data.filter(user => {
      return user.country == country;
    });
  });
});

this.service2.source2.subscribe(companies => {
  companies.forEach(company => {
    this.filteredData =  [];
    this.filteredData = this.data.filter(user => {
      return user.company == company;
    });
  });
});

где source1 и source2 равны BehaviorSubject, и оба события перехватываются subscribe.

Я хочу объединить обе подписки, потому что я хочу фильтровать данные в зависимости от обоих результатов, возвращаемых subscribe

Я пытался forkJoin и zip

zip([this.service1.source1, this.service1.source2]).subscribe(results => {
  console.log('zip results:', results);
});

forkJoin([this.service1.source1, this.service1.source2]).subscribe(results => {
  console.log('forkJoin results:', results);
});

Но с этими (zip & forkJoin) я заметил, что я даже не получаю ничего на консоли, как будто subscribe не выполняется, когда генерируются события.

Как объединить результаты обоих источников в одну подписку?

Ответы [ 3 ]

1 голос
/ 19 июня 2019

forkJoin может не подходить для вашего случая использования, поскольку он испускает последнее излученное значение из каждого, когда все наблюдаемые ЗАВЕРШЕНЫ.

zip также может не дать вамжелаемое поведение, потому что Он ждет, пока все входные потоки не выпустили свое n-ое значение, и как только это условие выполнено, он объединяет все эти n-е значения и выдает n-ое объединенное значение.

, поэтому в любом случае вы не получите излучение, пока в обеих наблюдаемых не будет излучения.поскольку this.service1.source1 и this.service1.source2 являются BehaviorSubject с использованием zip гарантирует начальную эмиссию.но более поздние выбросы будут происходить только тогда, когда будут излучаться обе наблюдаемые.

Я предлагаю использовать integraLatest beacuse всякий раз, когда какой-либо входной поток выдает значение, он объединяет последние значения, испускаемые каждым входным потоком..

combineLatest(this.service1.source1, this.service1.source2).subscribe(results => {
  console.log('combineLatest results:', results);
});

и, поскольку this.service1.source1 и this.service1.source2 являются BehaviorSubject такими, что:

Вариант субъекта, который требует начальногозначение и выдает свое текущее значение всякий раз, когда он подписан.

гарантируется, что вы получаете испускание, когда вы подписываетесь на него и всякий раз, когда какой-либо из наблюдаемых генерирует значение.

1 голос
/ 20 июня 2019

Как указывалось выше @ysf, если вы хотите отфильтровать локальные данные с обоими результатами при любом излучении, вам следует выбрать combineLatest.

Кроме того, я бы порекомендовал вам создатьпеременная в каждой службе в качестве наблюдаемого экземпляра для BehaviorSubject.Нравится:

// at Service1.service.ts
export class Service1Service {
  // Do not expose Subject directly.
  private source1 = new BehaviorSubject<YourDataType[]>(YourInitialValue);
  private source2 = new BehaviorSubject<YourDataType[]>(YourInitialValue);

  source1$: Observable<YourDataType[]>;
  source2$: Observable<YourDataType[]>;

  constructor() {
    // Create an Observable instance for your sake and open it to the public.
    this.source1$ = this.source1.asObservable();
    this.source2$ = this.source2.asObservable();
  }
}


// at wherever you subscribe
combineLatest(this.service1.source1$, this.service1.source2$).pipe(
  filter(([res1, res2]: YourDataType[]) => {
    // Filter your data with res1, res2
  })
).subscribe();
1 голос
/ 19 июня 2019

Синтаксис неверен.И forkJoin и zip принимают наблюдаемые в качестве аргументов.НЕ массив наблюдаемых.Удалите [] из звонка, и все будет в порядке.

Попробуйте это:

zip(this.service1.source1, this.service1.source2).subscribe(results => {
  console.log('zip results:', results);
});

forkJoin(this.service1.source1, this.service1.source2).subscribe(results => {
  console.log('forkJoin results:', results);
});
...