Попробуйте вставить простой оператор tap(d => console.log(JSON.stringify(d))),
до take(10)
и после bufferTime
, и вы увидите, что происходит.
По сути, у вас есть цикл, который испускает 100 событий. Вне этого цикла вы создаете Observable, который в 100 раз уведомляет объект, испускаемый вашим циклом событий.
Все это происходит синхронно в вашем примере.
Затем Observable преобразуется в трубу, которая делает следующее:
- сначала получает только первые 10 уведомлений через оператора
take
- затем говорит "накапливать все уведомления, которые происходят в первом
1 секунды и испускают их в виде массива "- учитывая, что
Весь процесс эмиссии происходит синхронно, и вы берете первые 10
из них
bufferTime(1000)
испустит один массив из 10 элементов
через 1 секунду
- больше ничего не будет генерироваться этим потоком, так как
take(10)
завершает источник Observable после 10 синхронных уведомлений,
поэтому оператор filter
бесполезен
- последнее, в подписке вы получаете только 1 уведомление, то есть единственное, отправленное
bufferTime
Сказал, что, вероятно, то, чего вы хотите достичь, может быть достигнуто с помощью кода в этом направлении
const ev = new EventEmitter();
function simpleObservable(maxEntries: number, waitTime: number) {
return fromEvent(ev, "log").pipe(
bufferCount(maxEntries),
filter(logs => !!logs.length),
mergeMap(logs => of(logs).pipe(delay(waitTime)), 1),
)
}
simpleObservable(10, 1000)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 100; i++) {
ev.emit("log", { hello: "world" });
}
Ключевым моментом здесь является то, что сначала вы разбиваете поток событий на массивы по 10 событий каждый, и это делается с помощью оператора bufferCount
.
Затем вы вводите асинхронность, используя mergeMap
с уровнем параллелизма, равным 1 (что эквивалентно оператору concatMap
). Вы в основном преобразуете каждый массив, испускаемый bufferCount
, в другой наблюдаемый с помощью функции of
и применяете задержку в 1 секунду для каждого из новых наблюдаемых. Затем вы объединяете их так, чтобы они излучались друг за другом с разницей во времени в 1 секунду.
ОБНОВЛЕННЫЙ ОТВЕТ после комментария @Lee Benson
bufferTime
может быть ответом на вашу проблему.
bufferTime
имеет 3 параметра:
- bufferTimeSpan : указывает срок службы буфера, т. Е. Observable генерирует буфер и сбрасывает его после каждого интервала
определяется bufferTimeSpan
- bufferCreationInterval : оставить для
null
для нашей цели
- maxBufferSize : указывает максимальный размер буфера - если размер достигнут, то Observable испускает буфер и сбрасывает его
Итак, если мы используем первый и последний параметр, мы сможем достичь желаемых целей.
Это тест, который я собрал
const ev = new EventEmitter();
// I merge to the event stream created by 'ev' a single notification emitted after 5 seconds,
// just to make the thing a bit more complex
const sourceObs = merge(
fromEvent(ev, "log"),
of(1).pipe(delay(5000))
)
function simpleObservable(maxEntries: number, waitTime: number) {
return sourceObs.pipe(
bufferTime(waitTime, null, maxEntries),
filter(logs => !!logs.length),
)
}
simpleObservable(10, 2000)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 19; i++) {
ev.emit("log", { hello: "world" });
}
// some other events are fired after 6 seconds, to make the source more complex
setTimeout(() => {
for (let i = 0; i < 17; i++) {
ev.emit("log", { hello: "world" });
}
}, 6000);