Объединить фиксированное количество наблюдаемых слиянием - RxJS - PullRequest
0 голосов
/ 08 сентября 2018

Я использую RxJS v6, но этот вопрос относится и к v5.

При использовании mergeMap мой исходный массив исчезает, и, хотя я могу выполнять несколько операций параллельно, у меня больше нет возможности отслеживать, когда все те наблюдаемые, которые я отправил в mergeMap, завершены.

Пример

of([1, 2, 3, 4])
.pipe(
    mergeMap(values => values),
)
.subscribe(console.log)

// 1
// 2
// 3
// 4

Хотелось бы увидеть:

// [1, 2, 3, 4]

Единственный способ, которым я до сих пор придумал, - это получить длину массива, но я уверен, что должен быть какой-то оператор, который мне не хватает:

of([1, 2, 3, 4])
.pipe(
    switchMap(values => (
        of(values)
        .pipe(
            mergeMap(value => value),
            bufferCount(values.length),
        )
    ))
)
.subscribe(console.log)

1 Ответ

0 голосов
/ 08 сентября 2018

При использовании mergeMap мой исходный массив исчезает

Причина в том, что mergeMap принимает ObservableInput в качестве параметра функции, которую вы передаете. Javascript ArrayObservableInput и, следовательно, работает в mergeMap, а mergeMap выполняет свою работу, то есть выравнивает ObservableInput (считайте, что mergeMap ранее назывался flatMap).

Итак, как говорит @cartant, если вы хотите вернуться к массиву, вы должны использовать оператор toArray.Другими словами,

of([1, 2, 3, 4])
.pipe(
    mergeMap(value => { // do stuff with a value of an array}),
    toArray()
)
.subscribe(console.log)

эквивалентно

of([1, 2, 3, 4])
.pipe(
    map(values => values.map(value => { // do stuff with a value of an array})),
)

Если ваш массив хотя и содержит Observables, и вы хотите в конечном итоге получить значения, которые они уведомляют, когда все они излучают, чем вы должныиспользуйте forkJoin.Это простой пример

of([1, 2, 3, 4].map(n => of(n)))
.pipe(
    switchMap(observablesOfValues => forkJoin(observablesOfValues))
)
.subscribe(console.log)
...