Использование RxJS, как буферизовать вызовы функций, пока не разрешится другой вызов асинхронной функции - PullRequest
0 голосов
/ 26 августа 2018

Как я могу использовать RxJS для буферизации вызовов функций до разрешения другой асинхронной функции?

Вот простой пример того, чего я хотел бы достичь

function asyncFunc(time) {
  setTimeout(() => {
      console.log('asyncFunc has resolved');
   }, time);
}

function funcToBuffer(time) {
  setTimeout(() => {
    console.log(time);
  }, time);
}

asyncFunc(3000);

funcToBuffer(1000);
funcToBuffer(2000);
funcToBuffer(4000);
funcToBuffer(5000);

asyncFunc(8000);

funcToBuffer(6000);
funcToBuffer(7000);

На данный момент этот код напечатает:

1000
2000
asyncFunc has resolved
4000
5000
6000
7000
asyncFunc has resolved

Что я хочу, чтобы это напечатало:

asyncFunc has resolved
1000
2000    
4000
5000
asyncFunc has resolved
6000
7000

По сути, я хочу какой-то поток управления, который позволит мне вызывать funcToBuffer всякий раз, когда я захочу, но под капотом я хочу, чтобы он продолжал выполняться всякий раз, когда asyncFunc выполняется и ожидает решения. После разрешения asyncFunc вызовы funcToBuffer больше не должны буферизироваться и выполняться сразу.

Я пытался играть с оператором буфера, но не смог достичь желаемого результата.

Ответы [ 3 ]

0 голосов
/ 27 августа 2018

Я начал работать над решением с combLatest, но подумал, что BehaviorSubject будет лучшим решением, как только я обдумаю его.

const { BehaviorSubject } = rxjs;
const { filter } = rxjs.operators;

let finalised$ = new BehaviorSubject(false);

function asyncFunc(time) {
  setTimeout(() => {
      console.log('asyncFunc has resolved');
      if (!finalised$.getValue()) {
        finalised$.next(true);
      }
  }, time);
}

function funcToBuffer(time) {
  finalised$.pipe(filter(finalised => finalised)).subscribe(_ => { // Filter so only fire finalised being true
    setTimeout(() => {
      console.log(time);
    }, time);
  });
}

asyncFunc(3000);

funcToBuffer(1000);
funcToBuffer(2000);
funcToBuffer(4000);
funcToBuffer(5000);

asyncFunc(8000);

funcToBuffer(6000);
funcToBuffer(7000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js"></script>
0 голосов
/ 30 августа 2018

Если я правильно понимаю, ваша главная цель - контролировать выполнение последовательности функций с помощью механизма, который буферизует их до тех пор, пока что-то не произойдет, и что-то именно то, что запускает выполнение буферизованных функций.

Если это так, нижеследующее может послужить основой для возможного решения вашей проблемы

const functions$ = new Subject<() => any>();

const buffer$ = new Subject<any>();
const executeBuffer$ = new Subject<any>();
const setBuffer = (executionDelay: number) => {
    buffer$.next();
    setTimeout(() => {
        executeBuffer$.next();
    }, executionDelay);
}

const functionBuffer$ = functions$
.pipe(
    bufferWhen(() => buffer$),
);

zip(functionBuffer$, executeBuffer$)
.pipe(
    tap(functionsAndExecuteSignal => functionsAndExecuteSignal[0].forEach(f => f()))
)
.subscribe();

Позвольте мне немного объяснить код.

Во-первых, мы строим functions$, т. Е. Наблюдаемые функции, которыми мы хотим управлять. Observable строится с использованием Subject, поскольку мы хотим иметь возможность контролировать уведомление о таких Observable программно. Другими словами, вместо того, чтобы начинать выполнение функции, подобной этой funcToBuffer(1000), мы создаем функцию (как объект) и просим functions$ Observable испустить такую ​​функцию

const aFunction = () => setTimeout(() => {console.log('I am a function that completes in 1 second');}, 1000);
functions$.next(aFunction);

Таким образом, мы создали поток функций, которые в конечном итоге будут выполняться.

Во-вторых, мы создаем еще 2 Observables, buffer$ и executeBuffer$, снова используя Предметы. Такие Observables используются, чтобы сигнализировать, когда мы должны создать буфер из функций, выпущенных до сих пор functions$, и когда мы должны начать выполнение буферизованных функций.

Эти последние 2 наблюдаемые используются в функции setBuffer. Когда вы вызываете setBuffer, вы в основном говорите: пожалуйста, создайте буфер со всеми функциями, которые были выпущены до сих пор functions$, и начните выполнять их после executionDelay времени, указанного в качестве параметра.

Буферизация выполняется с помощью functionBuffer$ Observable, который создается с помощью оператора bufferWhen. В исполнительной части реализован оператор zip, который позволяет нам устанавливать ритм выполнения функций на основе выбросов executeBuffer$ Observable.

Вы можете проверить приведенный выше код, настроив следующие тестовые данные.

let f: () => any;
setBuffer(3000);
f = () => setTimeout(() => {console.log('f1');}, 1000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f2');}, 2000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f4');}, 4000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f5');}, 5000);
functions$.next(f);
setBuffer(8000);
f = () => setTimeout(() => {console.log('f6');}, 6000);
functions$.next(f);
f = () => setTimeout(() => {console.log('f7');}, 7000);
functions$.next(f);
setBuffer(16000);
0 голосов
/ 26 августа 2018

CombineLatest, ожидающий срабатывания обоих предметов наблюдения.

const { of, combineLatest } = rxjs;
const { delay } = rxjs.operators;


let obs1$ = of(1).pipe(delay(1000));
let obs2$ = of(2).pipe(delay(2000));

let now = new Date();
combineLatest(obs1$, obs2$).subscribe(([obs1, obs2]) => {
  let ellapsed = new Date().getTime() - now.getTime();
  console.log(`${obs1} - ${obs2} took ${ellapsed}`);
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js"></script>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...