Запуск группы наблюдаемых в кусках в RxJS - PullRequest
3 голосов
/ 25 марта 2020

Я пытаюсь запустить пул наблюдаемых по частям и добавить интервал между ними. Я попробовал следующий код:

    let i = 0;
    from([].constructor(20)).pipe(
      concatMap(a => of(i).pipe(delay(1000))), // add a delay
      mergeMap(obj => of(i++), 5) // run 5 in parallel
    ).subscribe(res => {
      console.log('done', new Date().toISOString(), res);
    });

Теперь это добавляет задержку ко всем наблюдаемым, поэтому я получаю вывод:

done 2020-03-25T09:23:34.151Z 0
done 2020-03-25T09:23:35.151Z 1
done 2020-03-25T09:23:36.151Z 2
done 2020-03-25T09:23:37.151Z 3
done 2020-03-25T09:23:38.151Z 4
done 2020-03-25T09:23:39.151Z 5
done 2020-03-25T09:23:40.153Z 6
done 2020-03-25T09:23:41.155Z 7
done 2020-03-25T09:23:42.161Z 8
done 2020-03-25T09:23:43.163Z 9
done 2020-03-25T09:23:44.167Z 10
done 2020-03-25T09:23:45.170Z 11
done 2020-03-25T09:23:46.171Z 12
done 2020-03-25T09:23:47.177Z 13
done 2020-03-25T09:23:48.178Z 14
done 2020-03-25T09:23:49.182Z 15
done 2020-03-25T09:23:50.183Z 16
done 2020-03-25T09:23:51.186Z 17
done 2020-03-25T09:23:52.188Z 18
done 2020-03-25T09:23:53.192Z 19

Как вы можете видеть, он запускает каждый из них и добавляет 1 вторая задержка Чего я хочу добиться, так это запустить сначала 5 параллелей, а затем добавить 1 секунду задержки, а затем запустить следующие 5 и т. Д.

Я даже пытался поменять местами порядок mergeMap и concatMap в конвейере, но это дает тот же результат.

Есть мысли о том, как это сделать?

1 Ответ

3 голосов
/ 25 марта 2020

Вы можете использовать bufferCount для создания пакетов, а затем forkJoin для их параллельного запуска:

let i = 0;
const createRequest = () => of(i++);

from([].constructor(20)).pipe(
  bufferCount(5),
  concatMap(chunk => forkJoin(chunk.map(createRequest))
    .pipe(delay(1000)) // add a delay
  ),
).subscribe(res => {
  console.log('done', new Date().toISOString(), res);
});

Демонстрационная версия: https://stackblitz.com/edit/rxjs-jvhvqx

...