Вот последовательность, которая использует субъект для подачи сигнала, когда sendData()
завершена, и регулирует интервал на основе этого сигнала.
Демонстрация ниже имеет интервал 500 мс, но sendData вызывает задержку в 1000 мс.Если вы измените внутреннюю задержку sendData, скажем, 100, то поток будет выполняться с интервалами 500 мс.
console.clear()
const nextPage = new Rx.BehaviorSubject(true);
const interval = 500
const pages = 8
const getItems = () => Rx.Observable.of([1,2,3])
const sendData = () => {
return Rx.Observable.of('x')
.delay(1000)
}
let observable = Rx.Observable.interval(interval)
.throttle(i => nextPage)
.take(pages)
.concatMap(p => getItems())
.concatMap(items => sendData(items))
.do(x => nextPage.next())
const start = Date.now()
const show = (val) => console.log(val, Date.now() - start)
observable.subscribe(show);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.4/Rx.js"></script>
Мой ответ совершенно (ну, не полностью) неверен
Оказывается, что concatMap()
саморегулируется,поэтому тема совершенно не нужна.
Из learn rxjs - concatMap
concatMap не подписывается на следующую наблюдаемую, пока не завершится предыдущая
Вот пример кода без темы.
console.clear()
const interval = 500
const pages = 8
const getItems = () => Rx.Observable.of([1,2,3])
const sendData = () => {
return Rx.Observable.of('x')
.delay(1000)
}
let observable = Rx.Observable.interval(interval)
.take(pages)
.concatMap(p => getItems())
.concatMap(items => sendData(items))
const start = Date.now()
const show = (val) => console.log(val, Date.now() - start)
observable.subscribe(show);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.4/Rx.js"></script>