Допустим, у нас есть некоторая input$
наблюдаемая, которая излучает Item
:
const input$: Observable<Item>;
Для каждого излучения мне нужно переключиться на другую наблюдаемую (сродни switchMap
).Однако мне нужно убедиться, что все эти переключаемые наблюдаемые завершены и работают в последовательности.Достаточно легко, у нас есть concatMap
для достижения этой цели:
input$.pipe(concatMap(item => processItem(item)))
Однако, что я хотел бы сделать вместо этого: буферизировать элементы и уменьшать их (т.е. у меня есть функция (a: Item, b: Item): Item
), пока один изэти переключаемые наблюдаемые активны.В частности, предположим, type Item = {[key: string]: string}
.В этом случае мой редуктор будет просто {...a, ...b}
.
У нас есть много операторов buffer*
, window*
и throttle*
, хотя я не могу найти простой комбинация для достижения этого поведения.
Я мог бы очень хорошо написать свой пользовательский оператор, но меня интересует , возможно ли вместо этого выразить это как (simple-ish) комбинацию некоторыхвстроенные операторы ?
Просто чтобы прояснить: наблюдаемый вывод должен выдавать значение наблюдаемого, на которое мы переключились, а не буферизованные / уменьшенные значения.Кроме того, хотя завершение / ошибка источника должны отражаться в выходных данных, любая текущая внутренняя подписка должна заканчиваться первой.
Оператор, которого я ищу, должен иметь подпись, аналогичную
bufferedConcatMap<T, R>(
project: (value: T) => Observable<R>,
reducer: (values: T[]) => T
): OperatorFunction<T, R>;
Для полноты картины приведу мраморную диаграмму оператора, которого я ищу.Это предполагает добавление в качестве редуктора, и мы просто переключаемся на вход, но с задержкой в четыре такта:
Input: ---123--|
Output: ------1--(5|)
Здесь 1
немедленно переключается на нашу задержку (так как не происходитвнутренняя подписка), и через четыре галочки мы получим результат.Поскольку, в то же время, 2
и 3
были выпущены, они буферизуются вместе и уменьшаются до 2 + 3 = 5
, что снова испускается через четыре такта, так как мы переключились на это только после возвращения 1
.