У меня есть наблюдаемый источник source1
, который выдает значения, если он не излучает ничего более 2 секунд, я хочу переключиться на резервный источник source2
.И если source1
испускает снова, я хочу испускать из него.И так до бесконечности.
Пока у меня есть следующее
import { timeout, catchError, takeUntil, concat } from 'rxjs/operators';
declare const source1: Observable;
declare const source2: Observable;
source1.pipe(
timeout(2000),
catchError(() => {
return source2.pipe(
takeUntil(source1)
);
}),
concat(source1)
).subscribe(val => console.log(val));
Это почти работает.Если source1
не излучает через 2 секунды, он излучает от source2
до тех пор, пока source1
не излучает снова, а затем переключается на source1
.Но есть 2 основных недостатка:
- , когда
source1
испускает снова, первое испущенное значение «перехватывается» takeUntil
(source1
является горячей наблюдаемой) и не будет вconcat(source1)
- , если
source1
перестанет излучать во второй раз, я хотел бы иметь такое же поведение.С моей реализацией, это работает только один раз.
Есть идеи, как мне это решить?