Вы можете рассмотреть решение в этом направлении.
Сначала вы создаете тему, через которую вы отправляете все запросы, которые хотите сделать
const requests$ = new Subject<Observable<any>>()
Затем вы создаете тему, через которую вы сообщаете о состоянии клапана , т. Е. Можете ли вы выполнить запрос немедленно или нужно его буферизовать
const valve$ = new Subject<boolean>();
Теперь вы можете создать поток, который передает запросы, только если клапан открыт, т.е. если последнее значение, выданное valve$
, равно true
const openStream$ = valve$.pipe(
switchMap(valve => {
if (valve) {
return requests$;
} else {
return empty();
}
})
);
Вы также можете создать поток, который буферизует все запросы, когда клапан закрыт
const bufferedStream$ = requests$.pipe(
bufferToggle(valve$.pipe(filter(valve => !valve)), () => valve$.pipe(filter(valve => valve))),
mergeMap(bufferedCalls => bufferedCalls)
)
Теперь все, что вам нужно сделать, это merge
openStream$
и bufferedStream$
и subscribe
к полученному потоку, как это
merge(openStream$, bufferedStream$).pipe(
mergeMap(request => request)
)
.subscribe(httpCallResult => {// do stuff})
Я протестировал это решение со следующими данными, имитируя реальные вызовы http с помощью Observables of string
const requests$ = new Subject<Observable<string>>();
setTimeout(() => {requests$.next(of('A'))}, 50);
setTimeout(() => {requests$.next(of('B'))}, 60);
setTimeout(() => {requests$.next(of('C'))}, 100);
setTimeout(() => {requests$.next(of('D'))}, 110);
setTimeout(() => {requests$.next(of('E'))}, 130);
setTimeout(() => {requests$.next(of('F'))}, 250);
setTimeout(() => {requests$.next(of('G'))}, 260);
setTimeout(() => {requests$.next(of('H'))}, 300);
setTimeout(() => {requests$.next(of('I'))}, 310);
setTimeout(() => {requests$.next(of('L'))}, 330);
const valve$ = new Subject<boolean>();
setTimeout(() => {valve$.next(true)}, 30);
setTimeout(() => {valve$.next(false)}, 80);
setTimeout(() => {valve$.next(true)}, 120);
setTimeout(() => {valve$.next(false)}, 200);
setTimeout(() => {valve$.next(true)}, 290);