RX JS: пропуск значений из наблюдаемой, вызванный другими наблюдаемыми - PullRequest
3 голосов
/ 23 апреля 2020

Я пытаюсь добиться следующего поведения: у меня есть наблюдаемый источник, который излучает каждую секунду. Я пытаюсь игнорировать значения в течение некоторого периода (10 секунд) времени, если испускается другое наблюдаемое (mySubject) значение. Вот для чего я пришел:

this.source.pipe(
      takeUntil(this.mySubject),
      repeatWhen((observable) => observable.pipe(delay(10000))),
      tap((x) => console.log(x)),
).subscribe();

Теперь он прекращает излучение источника на 10 секунд при каждом mySubject излучении.

Проблема в том, что он мне нужен, если другое излучение mySubject для сброса «счетчика» 10 секунд и игнорирования в течение еще 10 секунд, ничего не испуская при этом.

Как мне этого добиться?

Ответы [ 2 ]

4 голосов
/ 23 апреля 2020

Боюсь, это требует немного более сложного решения:

const ignore$ = this.mySubject.pipe(
  switchMap(() => merge(of(true), of(false).pipe(delay(10 * 1000)))),
);

this.source.pipe(
  withLatestFrom(ignore$),
  filter(([value, ignore]) => !ignore),
  map(([value]) => value),
).subscribe(...);
0 голосов
/ 23 апреля 2020

Это немного сложно, и это (вероятно) не самое простое решение, но вы можете сделать то, что я предлагаю ниже (вы также можете взглянуть на это Stackblitz demo ):

// emits a number starting from 0 in every second
const sourceInterval = interval(1000);

// When semaphore emits, you will wait for two seconds
// The emissions should stop when it emits a true value
// and should resume when it emits a false value (after 
// the minimum waiting interval has passed if it was in
// the middle of one
const semaphore = new Subject<boolean>();

// semaphore will emit in 6 seconds
setTimeout(() => semaphore.next(true), 6000);

sourceInterval
  .pipe(
    // emits an array containing the emission of sourceInterval and semaphore
    withLatestFrom(semaphore.pipe(startWith(false))),

    // exhaustMap maps the value to a inner observable and only accepts new
    //   values when it completes.
    exhaustMap(([sourceIntervalEmission, semaphoreEmission]) =>
      semaphoreEmission
          // this observable completes only after 2 seconds, and release the
          // semaphore before that
        ? new Observable<void>(subscriber => {
            // release the semaphore
            semaphore.next(false);
            // complete the observable after 2 seconds
            setTimeout(() => subscriber.complete(), 2000);
          })
        : of(sourceIntervalEmission) // reemits the sourceInterval value
    )
  ).subscribe(console.log);

// expected output: 0 1 2 3 4 5 * * * 8 9 10...
// on the 6 emission of the sourceInterval, the semaphore emits, so exhaustMap
// gives the control to the `new Observable(...)` that will complete
// only after 2 seconds, releasing the semaphore and the subsequent emissions 
// from the sourceInterval, starting at the 8th emission.
...