Я думаю, что вы ищете оператор throttle
.
Вот рабочий Stackblitz .Ключом к выполнению этой работы является установка объекта конфигурации, который передается в throttle()
, что позволяет ему излучать (и обрабатывать) выбросы как ведущего, так и конечного источника, но игнорировать любые промежуточные выбросы в течение времени, в течение которого processData()
работает.
Вот ключевая функция из Stackblitz:
// Use 'from' to emit the above array one object at a time
const source$ = from(sourceData).pipe(
// Simulate a delay of 'delay * delayInterval' between emissions
concatMap(data => of(data).pipe(delay(data.delay * delayInterval))),
// Now tap in order to show the emissions on the console.
tap(data => console.log('next ', data.emit)),
// Finally, throttle as long as 'processData' is handling the emission
throttle(data => processData(data), { leading: true, trailing: true }),
).subscribe()
Коротко и красиво, и работает как требуется, за исключением одного выпуска ...
Обновление:
Эта «одна проблема» с приведенным выше кодом состоит в том, что, когда исходная наблюдаемая завершается, throttle()
отписывается от processData, фактически останавливая любую окончательную обработку, которая должна была быть выполнена.Исправление, как указал Барт ван ден Бург в комментариях ниже, заключается в использовании предмета.Я полагаю, что есть много способов сделать это, но Stackblitz был обновлен следующим кодом, который теперь работает:
// Set up a Subject to be the source of data so we can manually complete it
const source$ = new Subject();
// the data observable is set up just to emit as per the gist.
const dataSubscribe = from(sourceData).pipe(
// Simulate a delay of 'delay * delayInterval' before the emission
concatMap(data => of(data).pipe(delay(data.delay * delayInterval))),
).subscribe(data => {
console.log('next ', data.emit); // log the emission to console
source$.next(data); // Send this emission into the source
});
// Finally, subscribe to the source$ so we can process the data
const sourceSubscribe = source$.pipe(
// throttle as long as 'processData' is handling the emission
throttle(data => processData(data), { leading: true, trailing: true })
).subscribe(); // will need to manually unsubscribe later ...