Это решение будет работать, когда наблюдаемая горячая (и без refCount
):
streamA
.takeUntil(streamB)
.skip(1)
.repeat()
.merge(streamA.take(1))
.subscribe(console.log);
.takeUntil(streamB)
: сделать поток A
завершенным после потока B
, получая значение. .skip(1)
: сделать поток A
пропустить одно значение при запуске (или в результате .repeat()
). .repeat()
: сделать поток A
повтор (переподключение) неопределенно. .merge(streamA.take(1))
: смещение эффекта .skip(1)
в начале потока.
Пример создания пропуска потока каждые 5 секунд:
var streamA,
streamB;
streamA = Rx.Observable
.interval(1000)
.map(function (x) {
return 'A:' + x;
}).publish();
streamB = Rx.Observable
.interval(5000);
streamA
.takeUntil(streamB)
.skip(1)
.repeat()
.merge(streamA.take(1))
.subscribe(console.log);
streamA.connect();
Вы также можете использовать эту изолированную программную среду http://jsbin.com/gijorid/4/edit?js,console для выполнения BACTION()
в журнале консоли во время выполнения кода для ручного перевода значения в streamB
(что полезно для анализа кода).