У меня есть несколько дуплексных потоков с асинхронными методами очистки, но pipeline
ожидает только одного из них.
Рабочий пример кода:
const { pipeline, Readable, PassThrough, Writable } = require('stream')
const { promisify } = require('util')
const wait = (ms) => {
return new Promise((resolve) => setTimeout(resolve, ms))
}
// Send X items and close
// * -> 0, 1, 2, ..
const s1 = new Readable({
read() {
new Array(20)
.fill(0)
.map((_, i) => i)
.map(String)
.forEach(item => {
this.push(item)
})
this.push(null)
}
})
// Pass through all items
// On flush, wait a bit and emit an additional data at the end of the stream.
const s2 = new PassThrough({
async flush(cb) {
console.log('S2 Thinking')
await wait(2000)
this.push('s2')
console.log('S2 Done')
cb()
}
})
// Techincally the same as s2
const s3 = new PassThrough({
async flush(cb) {
console.log('S3 Thinking')
await wait(1000)
this.push('s3')
console.log('S3 Done')
cb()
}
})
const main = async() => {
// Wait until the pipeline is finished
await promisify(pipeline)(s1, s2, s3)
console.log('pipeline done')
}
main()
.catch(error => {
console.error(`CRITICAL FAILURE`)
console.error(error.stack)
})
Почему pipeline
разрешается, когдаs2 сделано, но не ждет s3? Я неправильно использую pipeline
?
Вывод этого
S2 Thinking
S2 Done
S3 Thinking
pipeline done
S3 Done
Но я ожидаю, что это будет
S2 Thinking
S2 Done
S3 Thinking
S3 Done
pipeline done