В RxJS, как запретить операторам выдавать значения, если наблюдаемый источник не испустил ничего? - PullRequest
3 голосов
/ 23 мая 2019

У меня есть три наблюдаемые foo$, bar$ и baz$, которые я объединяю, чтобы сформировать другую наблюдаемую.Это работает, как и ожидалось:

  • Поток начинается с >>>
  • Каждое значение отправляется одно за другим
  • Поток заканчивается <<<

const foo$ = of('foo');
const bar$ = of('bar');
const baz$ = of('baz');

merge(foo$, bar$, baz$).pipe(startWith('>>>'), endWith('<<<')).subscribe(str => {
  console.log(str);
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.2/rxjs.umd.min.js"></script>
<script>const {merge, of} = rxjs; const {startWith, endWith} = rxjs.operators;</script>

Теперь, если ни одна из трех вышеприведенных наблюдаемых не выдает значения, я не хочу выводить ни >>>, ни <<<.Таким образом, startWith и endWith могут "запускаться" только в том случае, если merge(foo$, bar$, baz$) действительно излучает значение.

Для того, чтобы смоделировать, что я отклоняю все значения, испускаемые foo$, bar$ и baz$ с функцией фильтрации.

const foo$ = of('foo').pipe(filter(() => false));
const bar$ = of('bar').pipe(filter(() => false));
const baz$ = of('baz').pipe(filter(() => false));

merge(foo$, bar$, baz$).pipe(startWith('>>>'), endWith('<<<')).subscribe(str => {
  console.log(str);
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.2/rxjs.umd.min.js"></script>
<script>const {merge, of} = rxjs; const {startWith, endWith, filter} = rxjs.operators</script>

Однако, как вы можете видеть на выходе, и startWith, и endWith выдали свое значение, хотя merge() не произвело никакого.

Вопрос : Как я могу предотвратить выполнение startWith и endWith, если наблюдаемое не испустило ни одного значения?

Ответы [ 5 ]

2 голосов
/ 23 мая 2019

Первое условие простое.Вы можете просто добавить к первой эмиссии concatMap:

mergeMap((v, index) => index === 0 ? of('>>>', v) : of(v))

Второе условие более сложное.Вы хотите в основном прямо противоположное defaultIfEmpty.Я не могу придумать ни одного простого решения, так что я, вероятно, все равно буду использовать endWith и просто проигнорирую излучение, если это первое и единственное излучение (что означает, что источник только что закончил, ничего не излучая):

endWith('<<<'),
filter((v, index) => index !== 0 || v !== '<<<'),

Полный пример:

const foo$ = of('foo');//.pipe(filter(() => false));
const bar$ = of('bar');//).pipe(filter(() => false));
const baz$ = of('baz');//.pipe(filter(() => false));

merge(foo$, bar$, baz$).pipe(
  mergeMap((v, index) => index === 0 ? of('>>>', v) : of(v)),
  endWith('<<<'),
  filter((v, index) => index !== 0 || v !== '<<<'),
).subscribe(console.log);

Демонстрация в реальном времени: https://stackblitz.com/edit/rxjs-n2alkc?file=index.ts

1 голос
/ 23 мая 2019

Вы можете назначить объединенную наблюдаемую переменную, взять первый элемент из потока и затем отобразить наблюдаемую, которую вы хотите выполнить, когда было выпущено хотя бы одно значение.

const foo$ = of('foo').pipe(filter(() => false))
const bar$ = of('bar').pipe(filter(() => false))
const baz$ = of('baz').pipe(filter(() => false))

const merged$ = merge(foo$, bar$, baz$);
merged$.pipe(
  take(1),
  switchMap(() => merged$.pipe(
    startWith('>>>'),
    endWith('<<<')
  ))
).subscribe(console.log);

https://stackblitz.com/edit/rxjs-phspsc

1 голос
/ 23 мая 2019

Из документации startWith:

Возвращает Observable, который испускает элементы, которые вы указали в качестве аргументов, прежде чем он начнет испускать элементы, испускаемые исходной Observable.

Так что startWith и endWith всегда будут работать.

Я не знаю, каким должен быть ваш ожидаемый результат, но если вы хотите объединить строки только для каждого переданного значения, вы можете использовать операторы map или switchMap.

EDIT: Пример с map для конкататации каждого значения:

const foo$ = of('foo').pipe(filter(() => false));
const bar$ = of('bar').pipe(filter(() => false));
const baz$ = of('baz').pipe(filter(() => false));

merge(foo$, bar$, baz$).pipe(map(v => `>>>${v}<<<`)).subscribe(str => {
  console.log(str);
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.2/rxjs.umd.min.js"></script>
<script>const {merge, of} = rxjs; const {startWith, endWith, filter, map} = rxjs.operators</script>
0 голосов
/ 06 июня 2019

Я вижу, что все ответы сделаны объединение операторов , но сделать это решение с использованием только операторов довольно сложно, почему бы просто создать свою собственную наблюдаемую ?Я думаю, что это решение является самым простым для понимания. Без скрытого умения , без сложных комбинаций операторов, без повторений подписки ...

Решение:

const foo$ = of('foo')//.pipe(filter(() => false));
const bar$ = of('bar')//.pipe(filter(() => false));
const baz$ = of('baz')//.pipe(filter(() => false));

function wrapIfOnEmit$(...obs) {
  return new Observable(observer => {
    let hasEmitted;
    const subscription = merge(...obs).subscribe((data) => {
      if (!hasEmitted) {
        observer.next('>>>');
        hasEmitted = true;
      }
      observer.next(data);
    },
    (error) => observer.error(error),
    () => {
      if (hasEmitted) observer.next('<<<');
      observer.complete();
    })
    return () => subscription.unsubscribe();
  })
}

wrapIfOnEmit$(foo$, bar$, baz$).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.2/rxjs.umd.min.js"></script>
<script>const {merge, of, Observable} = rxjs; const {filter} = rxjs.operators</script>

Надеюсь, это поможет!

0 голосов
/ 23 мая 2019

Одним из способов достижения этого является использование операторов материализации-дематериализации пары:

source$.pipe(
  // turn all events on stream into Notifications
  materialize(),

  // wrap elements only if they are present
  switchMap((event, index) => {
    // if its first event and its a value
    if (index === 0 && event.kind === 'N') {
      const startingNotification = new Notification('N', '>>>', undefined);
      return of(startingNotification, event);
    }

    // if its a completion event and it not a first event
    if (index > 0 && event.kind === 'C') {
      const endingNotification = new Notification('N', '<<<', undefined);
      return of(endingNotification, event);
    }

    return of(event);
  }),

  // turn Notifications back to events on stream
  dematerialize()
)

Wrap source emission only if its not empty

Играйте с этим кодом на игровой площадке:
https://thinkrx.io/gist/5a7a7f1338737e452ff6a1937b5fe05a
для удобства я добавил туда пустой источник и источник ошибок

Надеюсь, это поможет

...