Используя p-очередь , я могу ограничить, сколько раз я запускаю какое-то асинхронное действие (например, запросы API) за определенный период времени, и сколько из этих асинхронных действий может выполняться одновременно.
Это прекрасно работает, но я чувствую, что могу сделать то же самое с RxJS .У меня проблемы с выяснением, как это сделать.Я все еще довольно новичок в RxJS, и мне еще предстоит найти примеры, которые делают то, что я пытаюсь сделать.
Я вижу такие операторы, как buffer
и throttleTime
, и похоже, что это путь, но мне трудно собрать всю эту информацию вместе.
Как бы я скопировал p-очередь конфигурация:
{
concurrency: 2 /* at a time */
, intervalCap: 10 /* per every… */
, interval: ( 15 /* seconds */ * 1000 /* milliseconds */)
, carryoverConcurrencyCount: true
}
… с использованием RxJS ?
Решение RxJS должно:
- Немедленно разрешить сквозные значения, когда очередьпустой (т. е. немедленно начать новый интервал, не ждать некоторого интервала, время которого зависит от того, когда очередь в последний раз была непустой)
- Обеспечить те же функции, что и p-queue 's
carryoverConcurrencyCount
: «… задача должна завершиться через заданный интервал или будет перенесена на следующий интервал».
Полный пример с использованием p-queue :
// Queue/Concurrency-limit requests
const PQueue = require('p-queue') ;
const requestQueue = new PQueue({
concurrency: 2 /* at a time */
, intervalCap: 10 /* per every… */
, interval: ( 15 /* seconds */ * 1000 /* milliseconds */)
, carryoverConcurrencyCount: true
}) ;
// From https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Math/random
function getRandomInt(min, max) {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max - min)) + min; //The maximum is exclusive and the minimum is inclusive
}
const queuePromises = (
[...(Array(20)).keys()]
.map(number => requestQueue.add(() => new Promise(
(resolve, reject) => setTimeout(() => resolve(number), getRandomInt(0, /* up to */ 250) /* milliseconds */))
))
) ;
queuePromises.forEach(queuePromise => queuePromise.then(
number => console.log(number, 'resolved')
, error => console.error('Individual Promise error', error)
)) ;
Promise.all(queuePromises).then(
numbers => console.log('all are resolved', ...numbers)
, error => console.error('All Promises error', error)
) ;