В идеале вы можете использовать bufferCount
или, в зависимости от того, как вы хотите обработать начало или окончание последовательности, когда также меньше N
событий scan
или filter
:
const N = 3;
of(1, 2, 3, 4, 5, 6, 7).pipe(
bufferCount(N, 1), // `1` means it'll emit after every emission from source
).subscribe(console.log);
... или если вы хотите пропустить выбросы после завершения цепочки:
of(1, 2, 3, 4, 5, 6, 7).pipe(
bufferCount(N, 1),
filter(arr => arr.length === N),
).subscribe(console.log);
... или если вы хотите также начать события:
of(1, 2, 3, 4, 5, 6, 7).pipe(
scan((acc, item) => [...acc, item].slice(-N), []),
).subscribe(console.log);
Демонстрация в реальном времени: https://stackblitz.com/edit/rxjs-xtnscq