Параллельная очередь RxJS с параллельными работниками и обработкой каждого запроса - PullRequest
0 голосов
/ 30 июня 2019

У меня проблемы с RxJS и правильным способом обработки массива запросов.Допустим, у меня есть массив из примерно 50 запросов следующим образом:

let requestCounter = 0;
function makeRequest(timeToDelay) {
  return of('Request Complete!').pipe(delay(timeToDelay));
}

const requestArray = []
for(let i=0;i<25;i++){
  requestArray.push(makeRequest(3000)); //3 seconds request
  requestArray.push(makeRequest(1000)); //1 second request
}

Моя цель:

  • Запускать запросы параллельно
  • Только 5 могутзапустить в тот же момент
  • Когда запрос выполнен, начинается следующий в массиве
  • Когда запрос выполнен (успех или ошибка), мне нужно увеличить мою переменную 'requestCounter' наone (requestCounter ++)
  • Когда мой последний запрос в очереди выполнен, мне нужно подписаться на это событие и обработать массив, полученный в результате каждого запроса

Пока ближайший IЧтобы сделать это, следуя ответу в этом посте:

Параллельная очередь RxJS с параллельными рабочими?

Дело в том, что я обнаруживаю RxJS ипример слишком сложен для меня, и я не могу найти, как обработать счетчик для каждого запроса.

Надеюсь, вы мне поможете.(Извините за ломаный английский, это не мой родной язык)

Редактировать: Окончательное решение выглядит так:

forkJoinConcurrent<T>(
    observables: Observable<T>[],
    concurrent: number
  ): Observable<T[]> {
    return from(observables).pipe(
      mergeMap((outerValue, outerIndex) => outerValue.pipe(
        tap(// my code ),
        last(),
        catchError(error => of(error)),
        map((innerValue, innerIndex) => ({index: outerIndex, value: innerValue})),
      ), concurrent),
      toArray(),
      map(a => (a.sort((l, r) => l.index - r.index).map(e => e.value))),
    );
  }

1 Ответ

0 голосов
/ 01 июля 2019

Прежде всего вы должны использовать subject для хранения очереди запросов и взглянуть на оператор mergeMap, есть параметр concurrency, который вы можете установить для максимального параллелизма, а также переменная index для отслеживания количество звонков

https://www.learnrxjs.io/operators/transformation/mergemap.html

const requestArray=new Subject()
for(let i=0;i<25;i++){
  requestArray.next(makeRequest(3000)); //3 seconds request
  requestArray.next(makeRequest(1000)); //1 second request
}

requestArray.pipe(
mergeMap((res,index)=>of([res,index]),
res=>res,5),
map((res,index)=>{if(index===25) .... do your thing ; return res;})
).subscribe(console.log)
...