Вот как это можно сделать:
// Buffer until the `notifier` emits
// Then do not buffer anymore, just send the values
const src$ = interval(1000).pipe(take(10), publish(), refCount());
const notifier$ = timer(2001);
concat(
src$.pipe(buffer(notifier$), mergeAll()),
src$
).subscribe(console.log, null, () => console.warn('complete'));
Использование publish(), refCount()
будет выполнять многоадресную рассылку переданных значений всем потребителям. Это достигается размещением Subject
между источником и потребителями данных, что означает, что источник не будет подписан несколько раз.
src$.pipe(buffer(notifier$), mergeAll()),
будет буферизироваться до тех пор, пока не будет выдано notifier$
. Но, поскольку notifier$
также завершается, весь переданный наблюдаемый объект будет завершен, что позволит подписаться на следующий наблюдаемый объект (src$
).
mergeAll()
используется, потому что buffer
будет генерировать массив собранных значений и с mergeAll()
мы можем получить значения отдельно.
StackBlitz