Будет ли rx js merge.complete () закрывать подписки на все свои входные потоки? - PullRequest
3 голосов
/ 12 марта 2020

в следующем случае .pipe (takeUntil (this.unsubscribe)) закроет подписку на все входные потоки?

protected unsubscribe: Subject<void>;

public ngOnDestroy(): void {
   this.unsubscribe.next();
   this.unsubscribe.complete();
}

merge(
  this.productsSelect.selected,
  this.productsSelect.removed,
  this.productsSelect.typed)
  .pipe(takeUntil(this.unsubscribe))
  .subscribe(() => {
    this.detectChanges();
  );

Ответы [ 4 ]

3 голосов
/ 12 марта 2020

takeUntil закроет подписку, когда переданная в нее заметка получит уведомление.

Возьмите этот пример:

private destroyed$ = new Subject();
private sub: Subscription;

ngOnInit() {
  this.sub = merge(
    this.obs1(),
    this.obs2()
  ).pipe(
    takeUntil(this.destroyed$)
  ).subscribe(result => {
    console.log(result);
  }, () => {}, () => {
      console.log('complete');
  });
}

ngOnDestroy() {
  // this.sub is still open
  console.log(this.sub);

  // now close the subscription indirectly
  this.destroyed$.next();
  this.destroyed$.complete();

  // this.sub is now closed
  console.log(this.sub);
}

private obs1(): Observable<any> {
  return timer(0, 1000).pipe(mapTo('Hello, '));
}

private obs2(): Observable<any> {
  return timer(0, 1500).pipe(mapTo('World!'));
}

Пока компонент находится в DOM, две наблюдаемые будет продолжать срабатывать, и подписка на слияние получит значения из timer наблюдаемых.

Когда компонент удаляется из DOM, ngOnDestroy срабатывает. Первый console.log(this.sub) покажет, что подписка открыта. Второй покажет, что он был закрыт.

С консоли:

Object {closed: false, ...}

Object {closed: true, ...}

DEMO: https://stackblitz.com/edit/angular-8c9nkd

1 голос
/ 12 марта 2020

Да, слияние завершено

Сценарий

const finish$ = new Subject();
const source1$ = interval(200).pipe(mapTo('source 1'), takeUntil(finish$))
const source2$ = interval(210).pipe(mapTo('source 2'), takeUntil(finish$))
const source3$ = interval(220).pipe(mapTo('source 3'), takeUntil(finish$))

merge(
  source1$,
  source2$,
  source3$
).subscribe(
  (v) => console.info('next: ', v),
  (e) => console.error(e),
  () => console.warn('complete')
);

setTimeout(() => finish$.next(), 500);

Вывод:

next: source 1
next: source 2
next: source 3
next: source 1
next: source 2
next: source 3
complete

Stackblitz

rx js -заполнение

0 голосов
/ 12 марта 2020

Не думаю ...

takeUntil должен иметь возможность отправлять и получать.

Вы должны сделать это так:

// declare into class
unsubscribe : Subject<any>;

// into constructor
this.unsubscribe = new Subject();

// into your method
merge(
  this.productsSelect.selected,
  this.productsSelect.removed,
  this.productsSelect.typed)
  .pipe(takeUntil(this.unsubscribe))
  .subscribe(() => {
    this.detectChanges();
);

ngOnDestroy(): void {
  this.unsubscribe.next();
  this.unsubscribe.complete();
}
0 голосов
/ 12 марта 2020

Если я правильно понимаю ваш вопрос, тогда да. Оператор takeUntil завершит Наблюдение, поэтому не останется никаких подписок на внутренние Наблюдаемые.

...