подождите, пока не появится второй наблюдаемый - PullRequest
0 голосов
/ 21 марта 2020

В Rx js есть канал takeUntil , но нет канала wait До , что делает текущую наблюдаемую ожидающую ожидания secondde Observable to emit.

Моя последняя цель - заставить многих Observable все еще ждать, пока мой Observable init $ не выдаст только одно значение, чтобы продолжить их выполнение. Так что мой Observable init $ должен быть выполнен один раз, и до этого мой другой наблюдаемый должен ждать, пока inits не выдаст любое значение, отличное от null.

В этом простом примере я хочу добавить канал к pipedSource делает ждать до init $, поэтому источник должен ждать, пока init $ не выдаст свое значение.

import { interval, timer, Subject, combineLatest } from 'rxjs';
import { takeUntil, skipWhile, skipUntil, concatMap, map, take } from 'rxjs/operators';
import { BehaviorSubject } from 'rxjs';

const init$ = new BehaviorSubject(null);

const source = new BehaviorSubject(null);
const pipedSource = source
.pipe(
    skipWhile((res)=> res === null)
    //wait until init$ emits a non null value
)

//first subscription to source
pipedSource.subscribe(val => console.log(val));

source.next({profile:"me"});

//init emits once
setTimeout(()=>{
  init$.next(1);
},2000);

// a second subscription to source
setTimeout(()=>{
  pipedSource.subscribe(val => console.log(val));
},3000);

требуемый результат:

//after 2s of waiting
//first subscription returns "profile"
//after 3s
//second subscription returns "profile" 

Ответы [ 2 ]

1 голос
/ 21 марта 2020

Вы хотите запустить вторую наблюдаемую, когда первая наблюдаемая испускает ненулевое значение. Чтобы сделать это, используйте concatMap или switchMap после skipWhile.

ngOnInit() {
  const init$ = new BehaviorSubject(null);

  const source = new BehaviorSubject({profile:"me"});
  const pipedSource = init$
  .pipe(
      skipWhile((res)=> res === null),
      concatMap(() => source)
  );

  pipedSource.subscribe(val => console.log('first', val));

  //init emits once
  setTimeout(()=>{
    init$.next(1);
  },2000);

  // a second subscription to source
  setTimeout(()=>{
    pipedSource.subscribe(val => console.log('second', val));
  }, 3000);
}

Здесь я подписываюсь на наблюдаемую init$ в первую очередь, ожидая, пока она не выдаст ненулевое значение, и затем переключение на source наблюдаемый.

ДЕМО: https://stackblitz.com/edit/angular-p7kftd

0 голосов
/ 21 марта 2020

Если я правильно понял ваш вопрос, я вижу 2 возможных случая.

Первый - это то, что ваш source Observable начинает испускать свои значения независимо от wait$. Когда wait$ излучает, только тогда вы начинаете использовать значения, излучаемые source. Это поведение может быть реализовано с помощью функции combineLatest, подобной этой

//emits value every 500ms
const source$ = interval(500);

combineLatest(source$, wait$)
    .pipe(
        map(([s, v]) => s), // to filter out the value emitted by wait$
        take(5), // just to limit to 5 the values emitted
    )
    .subscribe(val => console.log(val));

setTimeout(() => {
    wait$.next(1);
}, 2000);

. В этом случае на консоли выводится последовательность, начинающаяся с 2, поскольку wait$ излучается через 2 секунды.

Во втором случае вы хотите, чтобы source начал излучать свои значения только после того, как wait$ испустил. В этом случае вы можете использовать оператор switchMap, например здесь

const wait_2$ = new Subject();

//emits value every 500ms
const source_2$ = interval(500);

wait_2$
    .pipe(
        switchMap(() => source_2$),
        take(5), // just to limit to 5 the values emitted
    )
    .subscribe(val => console.log(val));

setTimeout(() => {
    wait_2$.next(1);
}, 4000);

В этом случае через 4 секунды на консоли печатается последовательность, начинающаяся с 0.

...