rxjs - поток буфера, пока функция не вернет true - PullRequest
5 голосов
/ 11 декабря 2019

У меня есть поток чисел, которые увеличиваются на постоянную величину, которую я хочу выделить. Учитывая постоянную выборку interval, я хочу буферизовать поток до тех пор, пока разница между первым и последним буферным значением не станет больше или равна interval. Затем он генерирует этот массив, так что он похож на оператор buffer .

Я искал различные операторы rxjs, но не могу понять, как заставить это работать. Оператор bufferUntil был бы идеальным, но, кажется, не существует. Как я могу это реализовать?

Например:

const interval = 15;
//example stream would be: 5, 10 , 15, 20, 25, 30..

Observable.pipe(
   bufferUntil(bufferedArray => {
       let last = bufferedArray.length - 1;
       return (bufferedArray[last] - bufferedArray[0] >= interval);
   })
).subscribe(x => console.log(x));

//With an expected output of [5, 10, 15, 20], [ 25, 30, 35, 40],..

Ответы [ 2 ]

4 голосов
/ 12 декабря 2019

Вы можете реализовать оператор, который поддерживает пользовательский буфер. Вставьте все входящие значения в буфер, испустите буфер, когда он соответствует заданному условию, и сбросьте егоИспользуйте defer, чтобы снабдить каждого подписчика собственным буфером.

function bufferUntil<T>(emitWhen: (currentBuffer: T[]) => boolean): OperatorFunction<T, T[]> {
  return (source: Observable<T>) => defer(() => {
    let buffer: T[] = []; // custom buffer
    return source.pipe(
      tap(v => buffer.push(v)), // add values to buffer
      switchMap(() => emitWhen(buffer) ? of(buffer) : EMPTY), // emit the buffer when the condition is met
      tap(() => buffer = []) // clear the buffer
    )
  });
}

https://stackblitz.com/edit/rxjs-7awqmv

0 голосов
/ 11 декабря 2019

Вы можете просто использовать bufferCount для вашего конкретного случая, когда числа увеличиваются на постоянную величину, а интервал также постоянен.

import { bufferCount } from 'rxjs/operators';

const amount = 5;
const interval = 15;

source.pipe(
  bufferCount(Math.ceil(interval / amount) + 1)
).subscribe(console.log)

https://stackblitz.com/edit/rxjs-jntnpg

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...