Использование потока NodeJS хорошо подходит для описания проблемы. Противодавление потоковой библиотеки заставит каждую часть работы обрабатываться последовательно.
Пример, предоставленный OP, объединяет с использованием события data
в потоковом режиме с обработчиком событий asyn c. Это указывает NodeJS на выполнение зарегистрированного обработчика событий каждый раз, когда в конвейере появляется новый фрагмент данных / JSON. Однако NodeJS не будет автоматически ожидать функцию asyn c. Таким образом, обработчик событий asyn c выполняется последовательно для каждого фрагмента, но не будет ждать завершения обработки предыдущих фрагментов, прежде чем перейти к следующему фрагменту.
NodeJS (stream.Writable при объединении асинхронной c обработки фрагментов потока с обратным давлением. Класс Writable имеет функцию callback
, которая указывает, когда каждый фрагмент завершил обработку:
import stream from "stream";
import util from "util";
let downloadStart = Date.now();
fetch(url)
.then(response => {
const reader = response.body;
return util
.promisify(stream.pipeline)(
reader,
new stream.Writable({
write: (feature, encoding, callback) => {
try {
await sleep(numberOfMilliseconds);
if(shouldAdd(feature)){
// do work
}
callback();
}
catch(error){
callback(error);
}
}
})
)
.then(()=>{
console.log("parsed through", count, "items finished after", Date.now()-start);
})
});
Если вы работаете с NodeJS v13 +, вы можете использовать асинхронный c итератор вместо Writable, что, как правило, делает этот вариант использования более кратким:
import stream from "stream";
import util from "util";
let downloadStart = Date.now();
fetch(url)
.then(response => {
const reader = response.body;
return util
.promisify(stream.pipeline)(reader, async (source) => {
for await (const feature of source){
await sleep(numberOfMilliseconds);
if(shouldAdd(feature)){
// do work
}
}
})
.then(()=>{
console.log("parsed through", count, "items finished after", Date.now()-start);
})
});