Я читаю данные из потока в NodeJS и затем обрабатываю эти данные, используя асинхронную функцию в потоке преобразования.Я хотел бы, чтобы этот поток преобразования инициировал несколько вызовов асинхронной функции параллельно, но, похоже, он делает это по одному.
Чтобы проиллюстрировать свои ожидания, я написал небольшую программу ниже, которая генерирует числа из 0
до limit - 1
, а затем передает это через поток преобразования, который увеличивает каждое число с небольшой задержкой.Если вы запустите программу ниже, числа от 1 до 20 будут зарегистрированы в последовательности, все с небольшой задержкой.
Я бы ожидал, что они будут записаны в виде фрагментов 16 + 4, поскольку по умолчанию highWaterMark
равно 16. Возможно ли получить поведение, которое я хочу, и если да, то как?
Т.е. поток чтения будет генерировать данные очень быстро, преобразование будет медленнее, но должно получить до максимальной отметки и затем ждать, пока его данные были обработаны, а затем запрашивать больше из потока чтения.
const stream = require('stream')
const limit = 20
let index = 0
const numberStream = new stream.Readable({
objectMode: true,
read (amount) {
const innerLimit = Math.min(index + amount, limit)
while (index < innerLimit) {
this.push(index++)
}
if (index === limit) {
this.push(null)
}
},
})
const delayedIncStream = new stream.Transform({
objectMode: true,
transform (item, _, cb) {
setTimeout(() => cb(null, item + 1), 100)
},
})
const resultStream = numberStream.pipe(delayedIncStream)
resultStream.on('data', console.log)