Если я правильно понимаю, ваша главная цель - контролировать выполнение последовательности функций с помощью механизма, который буферизует их до тех пор, пока что-то не произойдет, и что-то именно то, что запускает выполнение буферизованных функций.
Если это так, нижеследующее может послужить основой для возможного решения вашей проблемы
const functions$ = new Subject<() => any>();
const buffer$ = new Subject<any>();
const executeBuffer$ = new Subject<any>();
const setBuffer = (executionDelay: number) => {
buffer$.next();
setTimeout(() => {
executeBuffer$.next();
}, executionDelay);
}
const functionBuffer$ = functions$
.pipe(
bufferWhen(() => buffer$),
);
zip(functionBuffer$, executeBuffer$)
.pipe(
tap(functionsAndExecuteSignal => functionsAndExecuteSignal[0].forEach(f => f()))
)
.subscribe();
Позвольте мне немного объяснить код.
Во-первых, мы строим functions$
, т. Е. Наблюдаемые функции, которыми мы хотим управлять. Observable строится с использованием Subject, поскольку мы хотим иметь возможность контролировать уведомление о таких Observable программно. Другими словами, вместо того, чтобы начинать выполнение функции, подобной этой funcToBuffer(1000)
, мы создаем функцию (как объект) и просим functions$
Observable испустить такую функцию
const aFunction = () => setTimeout(() => {console.log('I am a function that completes in 1 second');}, 1000);
functions$.next(aFunction);
Таким образом, мы создали поток функций, которые в конечном итоге будут выполняться.
Во-вторых, мы создаем еще 2 Observables, buffer$
и executeBuffer$
, снова используя Предметы. Такие Observables используются, чтобы сигнализировать, когда мы должны создать буфер из функций, выпущенных до сих пор functions$
, и когда мы должны начать выполнение буферизованных функций.
Эти последние 2 наблюдаемые используются в функции setBuffer
. Когда вы вызываете setBuffer
, вы в основном говорите: пожалуйста, создайте буфер со всеми функциями, которые были выпущены до сих пор functions$
, и начните выполнять их после executionDelay
времени, указанного в качестве параметра.
Буферизация выполняется с помощью functionBuffer$
Observable, который создается с помощью оператора bufferWhen
. В исполнительной части реализован оператор zip
, который позволяет нам устанавливать ритм выполнения функций на основе выбросов executeBuffer$
Observable.
Вы можете проверить приведенный выше код, настроив следующие тестовые данные.
let f: () => any;
setBuffer(3000);
f = () => setTimeout(() => {console.log('f1');}, 1000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f2');}, 2000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f4');}, 4000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f5');}, 5000);
functions$.next(f);
setBuffer(8000);
f = () => setTimeout(() => {console.log('f6');}, 6000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f7');}, 7000);
functions$.next(f);
setBuffer(16000);