RxJS испускает из источника2, если истек срок действия источника1 и когда источник1 возвращается из источника1 - PullRequest
0 голосов
/ 14 февраля 2019

У меня есть наблюдаемый источник source1, который выдает значения, если он не излучает ничего более 2 секунд, я хочу переключиться на резервный источник source2.И если source1 испускает снова, я хочу испускать из него.И так до бесконечности.

Пока у меня есть следующее

import { timeout, catchError, takeUntil, concat } from 'rxjs/operators';

declare const source1: Observable;
declare const source2: Observable;

source1.pipe(
    timeout(2000),
    catchError(() => {
      return source2.pipe(
        takeUntil(source1)
      );
    }),
    concat(source1)
).subscribe(val => console.log(val));

Это почти работает.Если source1 не излучает через 2 секунды, он излучает от source2 до тех пор, пока source1 не излучает снова, а затем переключается на source1.Но есть 2 основных недостатка:

  1. , когда source1 испускает снова, первое испущенное значение «перехватывается» takeUntil (source1 является горячей наблюдаемой) и не будет вconcat(source1)
  2. , если source1 перестанет излучать во второй раз, я хотел бы иметь такое же поведение.С моей реализацией, это работает только один раз.

Есть идеи, как мне это решить?

Ответы [ 2 ]

0 голосов
/ 14 февраля 2019

Решение, которое я нашел, которое решает мои пункты 1 и 2, следующее:

const source1HasStopped = source1.pipe(
  timeout(2000),
  catchError(() => of(1))
);

const fallback = source2.pipe(
  skipUntil(source1HasStopped),
  takeUntil(source1),
  repeat()
);

merge(source1, fallback).subscribe(console.log);

РЕДАКТИРОВАТЬ: К сожалению, это создает утечку подписки, потому что takeUntil не последний ...

0 голосов
/ 14 февраля 2019

Полагаю, вы могли бы сделать это, поделившись source1 и затем используя repeat для повторной подписки на ту же цепочку (я ее не тестировал):

const shared1 = source1.pipe(share());

source1.pipe(
  timeout(2000),
  catchError(() => merge(source1, source2).pipe(
    takeUntil(source1),
  )),
  repeat(),
).subscribe(val => console.log(val));
...