Регулирование асинхронных запросов с помощью RxJS - PullRequest
0 голосов
/ 25 апреля 2018

Мне нужно отправить большой объем данных в службу.Существует ограничение на количество элементов, которые мне разрешено отправлять за минуту, и мне нужно отправлять не более 350 элементов на запрос, поэтому я разбиваю данные на страницы и пытаюсь использовать Observable для регулирования запросов:

const maxItemsPerRequest = 350;
const interval = Math.round(60 /*seconds*/ / maxPerMinute * 1000);
const pages = Math.ceil(totalItems / maxItemsPerRequest);

let observable = Observable.interval(interval).take(pages);

observable.subscribe(async page => {
  const items = await getItems(page, maxItemsPerRequest);
  await this.sendData(items);
});

getItems и sendData может занять некоторое время для завершения, поэтому лимит запросов может быть превышен, например, на второй минуте (если на запросы, созданные в первую минуту, уходит более 60 секунд).

Как я могу убедиться, что подписчик ждет не менее interval миллисекунд после завершения предыдущего запроса перед отправкой нового запроса?

Ответы [ 2 ]

0 голосов
/ 26 апреля 2018

Вот последовательность, которая использует субъект для подачи сигнала, когда sendData() завершена, и регулирует интервал на основе этого сигнала.

Демонстрация ниже имеет интервал 500 мс, но sendData вызывает задержку в 1000 мс.Если вы измените внутреннюю задержку sendData, скажем, 100, то поток будет выполняться с интервалами 500 мс.

console.clear()

const nextPage = new Rx.BehaviorSubject(true);

const interval = 500 
const pages = 8

const getItems = () => Rx.Observable.of([1,2,3])

const sendData = () => {
  return Rx.Observable.of('x')
    .delay(1000)
}

let observable = Rx.Observable.interval(interval)
  .throttle(i => nextPage)
  .take(pages)
  .concatMap(p => getItems())
  .concatMap(items => sendData(items))
  .do(x => nextPage.next())

const start = Date.now()
const show = (val) => console.log(val, Date.now() - start)
observable.subscribe(show);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.4/Rx.js"></script>

Мой ответ совершенно (ну, не полностью) неверен

Оказывается, что concatMap() саморегулируется,поэтому тема совершенно не нужна.

Из learn rxjs - concatMap

concatMap не подписывается на следующую наблюдаемую, пока не завершится предыдущая

Вот пример кода без темы.

console.clear()

const interval = 500 
const pages = 8

const getItems = () => Rx.Observable.of([1,2,3])

const sendData = () => {
  return Rx.Observable.of('x')
    .delay(1000)
}

let observable = Rx.Observable.interval(interval)
  .take(pages)
  .concatMap(p => getItems())
  .concatMap(items => sendData(items))

const start = Date.now()
const show = (val) => console.log(val, Date.now() - start)
observable.subscribe(show);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.4/Rx.js"></script>
0 голосов
/ 26 апреля 2018

По сути, вы не хотите выполнять действие внутри subscribe, потому что там у вас нет контроля над его завершением.Вместо этого вы хотите сделать его частью цепочки, и задержка начнется с конца предыдущего вызова.

Например, вы можете сделать это так:

let observable = Observable.range(pages)
  .concatMap(page => getItems(page, maxItemsPerRequest)
    .concatMap(items => this.sendData(items))
    .delay(interval) // delay the emission from this Observable
  )
  .subscribe(console.log);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...