С Rx JS я изо всех сил пытаюсь найти способ обработки сочетания последовательных и параллельных заданий. Или даже изменить / изменить параллелизм обработки очереди заданий.
Вот базовый c рабочий пример обработки элементов в массиве с жестко заданным параллелизмом:
import { from, defer } from 'rxjs'
import { mergeAll } from 'rxjs/operators'
const delay = 1000
const concurrency = 2
const doSomethingSlow = async (val) => {
await new Promise(resolve => {
setTimeout(() => {
console.log(`${val} - Done`)
resolve()
}, delay)
})
}
const jobs = [
1,2,3,4,5,6,7,8,9,10
]
const observables = from(jobs.map(num => defer(() => doSomethingSlow(num))))
observables
.pipe(
mergeAll(concurrency)
)
.subscribe()
https://stackblitz.com/edit/rxjs-tyfer3?file=index.ts
То, что я хотел бы сделать, это иметь массив родительского уровня в виде списка Последовательные задачи , которые содержат подмассивы Параллельные задания . например.
const tasks = [
{ concurrency: 1, jobs: [1] },
{ concurrency: 2, jobs: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11] },
{ concurrency: 1, jobs: [12, 13] }
]
Можно надеяться, что сначала будет обработано задание 1
, и ничего больше не будет запущено, пока оно не закончится. Затем задания 2
и 3
выбираются параллельно, а задания 4
- 11
обрабатываются всякий раз, когда из двух доступных процессоров параллелизма открывается слот. Только когда все они закончены, работу 12
начинают выполнять, а затем 13
.
Я не могу действительно представить себе правильный подход к этому. Я попытался merge
отделить obserables
, в котором были свои pipe(mergeAll(num))
, но, похоже, это не сработало. У меня есть ощущение, что switchMap
может быть ключом здесь, но я не уверен, как именно использовать его в этом сценарии.
Альтернативная мысль, которую я имел, пыталась динамически изменить значение параллелизма , но кажется, что это может быть еще сложнее, и это может не привести к тому поведению, которое я ищу.
Любая помощь будет принята с благодарностью.