RxJS параллельная очередь с параллельными работниками? - PullRequest
0 голосов
/ 18 января 2019

Допустим, я хочу загрузить 10000 файлов. Я могу легко построить очередь из этих 10 000 файлов (с удовольствием приму совет, если что-то из этого можно будет сделать лучше),

import request from 'request-promise-native';
import {from} from 'rxjs';

let reqs = [];
for ( let i = 0; i < 10000; i++ ) {
  reqs.push(
    from(request(`http://bleh.com/${i}`))
  )
};

Теперь у меня есть массив наблюдаемых Rx.JS, который я создал из обещаний, которые представляют мою очередь. Теперь о поведении того, что я хочу, я хочу выдать

  • Три одновременных запроса к серверу
  • По завершении запроса я бы хотел, чтобы новый запрос был выполнен.

Я могу создать решение этой проблемы, но в свете таких вещей, как очередь Rxjs , которую я никогда не использовал, мне интересно, каков самый правильный способ Rxjs сделать это.

1 Ответ

0 голосов
/ 18 января 2019

Звучит так, как будто вам нужен эквивалент forkJoin, который поддерживает максимальное количество одновременных подписок, указанных абонентом.

Возможно повторно реализовать forkJoin, используя mergeMap и выставить параметр concurrent, , как это :

import { from, Observable } from "rxjs";
import { last, map, mergeMap, toArray } from "rxjs/operators";

export function forkJoinConcurrent<T>(
  observables: Observable<T>[],
  concurrent: number
): Observable<T[]> {
  // Convert the array of observables to a higher-order observable:
  return from(observables).pipe(
    // Merge each of the observables in the higher-order observable
    // into a single stream:
    mergeMap((observable, observableIndex) => observable.pipe(
      // Like forkJoin, we're interested only in the last value:
      last(),
      // Combine the value with the index so that the stream of merged
      // values - which could be in any order - can be sorted to match
      // the order of the source observables:
      map(value => ({ index: observableIndex, value }))
    ), concurrent),
    // Convert the stream of last values to an array:
    toArray(),
    // Sort the array of value/index pairs by index - so the value
    // indices correspond to the source observable indices and then
    // map the pair to the value:
    map(pairs => pairs.sort((l, r) => l.index - r.index).map(pair => pair.value))
  );
}
...