Как сделать наблюдаемые данные буфера, пока не произойдет событие, и прекратить буферизацию? - PullRequest
0 голосов
/ 06 мая 2020

У меня есть исходящий поток рисования от пользователя, и мне нужно подождать, пока не появятся данные профиля пользователя.

В общем, мне нужны данные буфера до тех пор, пока не произойдет событие, а затем воспроизвести эти данные и пропустить буферизацию.

Я могу сделать это с помощью внешнего массива, как показано в следующем коде ( stackblitz ):


    import { of, interval } from 'rxjs'; 
    import { map, take } from 'rxjs/operators';

    const numbers = interval(1000);
    const source = numbers.pipe(
      take(4)
    );
    let allowPerform = false;
    setTimeout(_=>{allowPerform = true},2001);
    const fifo=[];
    source.subscribe(x => {
      fifo.push(x);
      if (!allowPerform) {
         console.log('skip perform for:', x);
         return; 
      }

      let item ;  
      while ( fifo.length) {
        item = fifo.shift(); 
        console.log('perform for:', item);
      } 
    });

его вывод:


skip perform for: 0
skip perform for: 1
perform for: 0
perform for: 1
perform for: 2
perform for: 3

Однако как это сделать в RX js способом?

1 Ответ

2 голосов
/ 06 мая 2020

Вот как это можно сделать:

// Buffer until the `notifier` emits
// Then do not buffer anymore, just send the values

const src$ = interval(1000).pipe(take(10), publish(), refCount());
const notifier$ = timer(2001);

concat(
  src$.pipe(buffer(notifier$), mergeAll()),
  src$
).subscribe(console.log, null, () => console.warn('complete'));

Использование publish(), refCount() будет выполнять многоадресную рассылку переданных значений всем потребителям. Это достигается размещением Subject между источником и потребителями данных, что означает, что источник не будет подписан несколько раз.

src$.pipe(buffer(notifier$), mergeAll()), будет буферизироваться до тех пор, пока не будет выдано notifier$. Но, поскольку notifier$ также завершается, весь переданный наблюдаемый объект будет завершен, что позволит подписаться на следующий наблюдаемый объект (src$).
mergeAll() используется, потому что buffer будет генерировать массив собранных значений и с mergeAll() мы можем получить значения отдельно.

StackBlitz

...