Я хотел бы добиться следующего с помощью RxJs:
- Групповое сообщение, которое находится в пределах ~ 200 мс от предыдущего сообщения
- Излучать группу сообщений, если в течение 250 мс не было получено никаких новых сообщений
- Излучать группу сообщений, когда группа достигает 10 элементов.
Благодаря нескольким другим вопросам по SO, таким как этот , довольно просто реализовать 1 и 2, используя комбинацию buffer
и debounceTime
, например, так:
const subject$ = new Subject<number>();
// Create the debounce
const notifier$ = subject$.pipe(
debounceTime(250)
);
// Subscribe to the subject using buffer and debounce
subject$
.pipe(
buffer(notifier$)
)
.subscribe(value => console.log(value));
// Add a number to the subject every 200ms untill it reaches 10
interval(200)
.pipe(
takeWhile(value => value <= 10),
)
.subscribe(value => subject$.next(value));
Здесь сообщения буферизуются до тех пор, пока они отправляются в пределах 200 мс от последнего. Если это занимает более 200 мс, запускается новый буфер. Однако, если сообщения продолжают поступать в течение 200 мс, они могут быть сохранены в буфере навсегда. Вот почему я хочу добавить жесткое ограничение на размер буфера.
Я создал пример на StackBlitz , чтобы продемонстрировать откат буфера. Но я не могу понять, как ограничить буфер так, чтобы он испускался, когда он достигает 10 элементов.