Как использовать Rx JS для координации как последовательных, так и параллельных заданий? - PullRequest
1 голос
/ 12 марта 2020

С Rx JS я изо всех сил пытаюсь найти способ обработки сочетания последовательных и параллельных заданий. Или даже изменить / изменить параллелизм обработки очереди заданий.

Вот базовый c рабочий пример обработки элементов в массиве с жестко заданным параллелизмом:

import { from, defer } from 'rxjs'
import { mergeAll } from 'rxjs/operators'

const delay = 1000
const concurrency = 2

const doSomethingSlow = async (val) => {
  await new Promise(resolve => {
    setTimeout(() => {
      console.log(`${val} - Done`)
      resolve()
    }, delay)
  })
}

const jobs = [
  1,2,3,4,5,6,7,8,9,10
]
const observables = from(jobs.map(num => defer(() => doSomethingSlow(num))))

observables
  .pipe(
    mergeAll(concurrency)
  )
  .subscribe()

https://stackblitz.com/edit/rxjs-tyfer3?file=index.ts

То, что я хотел бы сделать, это иметь массив родительского уровня в виде списка Последовательные задачи , которые содержат подмассивы Параллельные задания . например.

const tasks = [
  { concurrency: 1, jobs: [1] },
  { concurrency: 2, jobs: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11] },
  { concurrency: 1, jobs: [12, 13] }
]

Можно надеяться, что сначала будет обработано задание 1, и ничего больше не будет запущено, пока оно не закончится. Затем задания 2 и 3 выбираются параллельно, а задания 4 - 11 обрабатываются всякий раз, когда из двух доступных процессоров параллелизма открывается слот. Только когда все они закончены, работу 12 начинают выполнять, а затем 13.

Я не могу действительно представить себе правильный подход к этому. Я попытался merge отделить obserables, в котором были свои pipe(mergeAll(num)), но, похоже, это не сработало. У меня есть ощущение, что switchMap может быть ключом здесь, но я не уверен, как именно использовать его в этом сценарии.

Альтернативная мысль, которую я имел, пыталась динамически изменить значение параллелизма , но кажется, что это может быть еще сложнее, и это может не привести к тому поведению, которое я ищу.

Любая помощь будет принята с благодарностью.

1 Ответ

1 голос
/ 12 марта 2020

mergeMap имеет необязательный параметр concurrent, который сообщает ему, на сколько внутренних наблюдений следует подписаться одновременно. Фактически, concatMap - это mergeMap с 1 параллелизмом. Таким образом, вы можете использовать это в своих интересах:

const tasks = [
  { concurrency: 1, jobs: [1] },
  { concurrency: 2, jobs: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11] },
  { concurrency: 1, jobs: [12, 13] }
];

from(tasks)
  .pipe(
    concatMap(task => from(task.jobs).pipe( // single task such as `{ concurrency: 1, jobs: [1] }`
      mergeMap(job => doSyncJob(job), task.concurrency) // `task.concurrency` is the important part here
      // maybe `toArray()` to collect all results for this task into a single array
    )),
  )
  .subscribe(...);

...