Делать части трубы RX js только для определенных выбросов наблюдаемых - PullRequest
2 голосов
/ 25 февраля 2020

У меня есть rx js -Об наблюдаемый, на который я подписываюсь. Теперь у меня есть две разные потребности, которые нужно обработать в канале Observable.

Сначала я хочу отобразить вывод. Во-вторых, я хочу использовать tap для запуска побочного эффекта, но побочный эффект не должен запускаться при первом излучении.

Так что это, очевидно, не работает, потому что skip работает глобально на канале:

this.userChangeSubscription = this.userStateService.userState$
  .pipe(
    map(userState => userState.prop),
    skip(1),
    tap(() => this.sideEffect())
   )
  .subscribe();

Есть ли способ сделать это, не подписываясь на Observable дважды?

edit : Хорошо, теперь у меня есть несколько вариантов, которые кажется, все работает. Теперь: какой выбрать?

Ответы [ 3 ]

1 голос
/ 04 марта 2020

Я видел, что уже есть рабочие ответы, но я думаю, что вам лучше всего написать свой собственный оператор rx js для этого варианта использования. Это приводит к чистому решению, и ваша функция pipe проясняет ваши намерения.

Для этого нам нужно использовать defer наблюдаемый :

function tapSkipFirst<T>(fn: Function): OperatorFunction<T, T> {
  return function(source: Observable<any>) {
    return defer(() => {
      let skip = true;
      return source.pipe(
        tap((v?:any) => {
          if (!skip) {
            fn(v);
          }
          skip = false;
        })
      );
    });
  };
}

Мы используем переменную skip, чтобы решить, запустить ли функцию побочного эффекта или нет. В конце первого прогона мы переключаем skip на false и, следовательно, запускаем функцию побочного эффекта для всех последующих прогонов. Нам нужно использовать наблюдаемую здесь defer, потому что код внутри defer вызывается только при подписке, а не при создании. Это важно, потому что в противном случае все подписки будут иметь одну и ту же переменную skip.

И теперь вы можете легко использовать новый пользовательский оператор, например:

this.userChangeSubscription = this.userStateService.userState$
  .pipe(
    map(userState => userState.prop),
    tapSkipFirst(() => this.sideEffect())
   )
  .subscribe();
0 голосов
/ 26 февраля 2020

Вы можете использовать оператор switchMap для такого случая использования. Он дает вам параметр index, позволяющий вам выполнять различные действия, основываясь на том, что излучение является первым, вторым, третьим и т. Д. c ..

В этом случае sideEffect не будет запущено, если index равно 0 (первая эмиссия), но будет запускаться для любых других эмиссий фьючерсов.

Я использую этот оператор для запуска побочных эффектов или изменения значения, испускаемого на основе условия. Это здорово, потому что вам не нужно разбивать исходную Наблюдаемую на две разные Наблюдаемые, имеющие другой конвейер.

this.userChangeSubscription = this.userStateService.userState$
  .pipe(
    map(userState => userState.prop),
    switchMap((value, index) => {
      return index > 0
        ? of(value).pipe(tap(() => this.sideEffect()))
        : of(value);
    })
  )
.subscribe();
0 голосов
/ 25 февраля 2020

Да, это возможно. Пример:

const userProp$ = this.userStateService.userState$.pipe(
  map(userState => userState.prop),
  shareReplay({ bufferSize: 1, refCount: true })
);

Здесь shareReplay позволит вам разделить этот ресурс между несколькими подписчиками без повторного запуска всей цепочки. Как только все подписчики закончат прослушивание, shareReplay завершится, и наблюдаемое выше закончится.

Затем вы можете подписаться столько раз, сколько захотите, а также изолировать побочный эффект, который является хорошей идеей в первом место:

userProp$
  .pipe(
    skip(1),
    tap(() => this.sideEffect())
  )
  .subscribe();
...