Как добавить ограничение на буферизацию и откровение в подписке RxJs - PullRequest
1 голос
/ 22 марта 2019

Я хотел бы добиться следующего с помощью RxJs:

  1. Групповое сообщение, которое находится в пределах ~ 200 мс от предыдущего сообщения
  2. Излучать группу сообщений, если в течение 250 мс не было получено никаких новых сообщений
  3. Излучать группу сообщений, когда группа достигает 10 элементов.

Благодаря нескольким другим вопросам по SO, таким как этот , довольно просто реализовать 1 и 2, используя комбинацию buffer и debounceTime, например, так:

const subject$ = new Subject<number>();

// Create the debounce
const notifier$ = subject$.pipe(
  debounceTime(250)
);

// Subscribe to the subject using buffer and debounce
subject$
  .pipe(
    buffer(notifier$)
  )
  .subscribe(value => console.log(value));

// Add a number to the subject every 200ms untill it reaches 10
interval(200)
  .pipe(
    takeWhile(value => value <= 10),
  )
  .subscribe(value => subject$.next(value));

Здесь сообщения буферизуются до тех пор, пока они отправляются в пределах 200 мс от последнего. Если это занимает более 200 мс, запускается новый буфер. Однако, если сообщения продолжают поступать в течение 200 мс, они могут быть сохранены в буфере навсегда. Вот почему я хочу добавить жесткое ограничение на размер буфера.

Я создал пример на StackBlitz , чтобы продемонстрировать откат буфера. Но я не могу понять, как ограничить буфер так, чтобы он испускался, когда он достигает 10 элементов.

Ответы [ 2 ]

1 голос
/ 23 марта 2019

Мы могли бы создать другой уведомитель, чтобы ограничить количество элементов (например, с помощью elementAt), использовать уведомитель, который излучает первым (с race), и применить его рекурсивно (с expand):

const notifierDebouncing$ = subject$.pipe(
  debounceTime(PERIOD),
  take(1)
);

const notifierLimiting$ = subject$.pipe(
  elementAt(AMOUNT - 1)
);

const notifier$ = interval(0).pipe(
  take(1),
  expand(_ => race(notifierLimiting$, notifierDebouncing$))
);

subject$
  .pipe(buffer(notifier$))
  .subscribe(value => console.log(value));

Как вы думаете?

Вот пример, основанный на вашем демонстрационном приложении: https://stackblitz.com/edit/rxjs-buffer-debounce-cf4qjy (откройте консоль, затем переместите курсор на 2000 мс и остановитесь на 500 мс)

0 голосов
/ 22 марта 2019

Разве вы не можете просто отфильтровать его, если это 10-й предмет?Может быть, я неправильно понял ваш вопрос.

    interval(this.interval)
      .pipe(
        filter(value => value % 10 === 0),
        takeWhile(value => value <= this.amount),
      )
      .subscribe(value => this.subject$.next(value));
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...