Отправлять значения в очередь с помощью RxJS, как это можно сделать с помощью p-queue - PullRequest
0 голосов
/ 29 сентября 2018

Используя p-очередь , я могу ограничить, сколько раз я запускаю какое-то асинхронное действие (например, запросы API) за определенный период времени, и сколько из этих асинхронных действий может выполняться одновременно.

Это прекрасно работает, но я чувствую, что могу сделать то же самое с RxJS .У меня проблемы с выяснением, как это сделать.Я все еще довольно новичок в RxJS, и мне еще предстоит найти примеры, которые делают то, что я пытаюсь сделать.

Я вижу такие операторы, как buffer и throttleTime, и похоже, что это путь, но мне трудно собрать всю эту информацию вместе.

Как бы я скопировал p-очередь конфигурация:

{
    concurrency:    2 /* at a time */
    , intervalCap: 10 /* per every… */
    , interval: (  15 /* seconds */ * 1000 /* milliseconds */)
    , carryoverConcurrencyCount: true
}

… с использованием RxJS ?

Решение RxJS должно:

  • Немедленно разрешить сквозные значения, когда очередьпустой (т. е. немедленно начать новый интервал, не ждать некоторого интервала, время которого зависит от того, когда очередь в последний раз была непустой)
  • Обеспечить те же функции, что и p-queue 's carryoverConcurrencyCount: «… задача должна завершиться через заданный интервал или будет перенесена на следующий интервал».

Полный пример с использованием p-queue :

// Queue/Concurrency-limit requests
const PQueue = require('p-queue') ;
const requestQueue = new PQueue({
    concurrency:    2 /* at a time */
    , intervalCap: 10 /* per every… */
    , interval: (  15 /* seconds */ * 1000 /* milliseconds */)
    , carryoverConcurrencyCount: true
}) ;

// From https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Math/random
function getRandomInt(min, max) {
  min = Math.ceil(min);
  max = Math.floor(max);
  return Math.floor(Math.random() * (max - min)) + min; //The maximum is exclusive and the minimum is inclusive
}

const queuePromises = (
    [...(Array(20)).keys()]
    .map(number => requestQueue.add(() => new Promise(
        (resolve, reject) => setTimeout(() => resolve(number), getRandomInt(0, /* up to */ 250) /* milliseconds */))
    ))
) ;

queuePromises.forEach(queuePromise => queuePromise.then(
    number => console.log(number, 'resolved') 
    , error => console.error('Individual Promise error', error)
)) ;

Promise.all(queuePromises).then(
    numbers => console.log('all are resolved', ...numbers)
    , error => console.error('All Promises error', error)
) ;

1 Ответ

0 голосов
/ 29 сентября 2018

Я не знаю p-очередь , но, вероятно, вы могли бы обратиться к оператору mergeMap, чтобы выполнить то, что вам нужно, и в частности к параметру concurrency mergeMap.С помощью параметра concurrency вы можете определить, сколько параллельных выполнений вы можете запустить одновременно.

Таким образом, код, начиная с вашего примера, может выглядеть примерно так:

const concurrency = 1;
function getRandomInt(min, max) {
  min = Math.ceil(min);
  max = Math.floor(max);
  return Math.floor(Math.random() * (max - min)) + min; //The maximum is exclusive and the minimum is inclusive
}

const queuePromises = (
    [...(Array(20)).keys()]
    .map(number => new Promise(
      (resolve, reject) => setTimeout(() => resolve(number), getRandomInt(0, /* up to */ 250) /* milliseconds */)))
) ;

from(queuePromises).pipe(
  mergeMap(qp => from(qp), concurrency)
)
.subscribe(
  number => console.log(number, 'resolved') 
  , error => console.error('Individual Promise error', error),
  () => console.log('all are resolved')
)

Установка значенияconcurrency к 1 позволяет вам видеть, что на самом деле у вас есть результаты обещаний, поступающих последовательно упорядоченным образом.

...