Как правильно обрабатывать противодавление в реализации Transform метода _flush
?Другими словами, если .push()
возвращает false во время очистки, существуют ли какие-либо механизмы для правильной обработки обратного давления из нисходящего потока?
Документация предписывает прекратить нажатие, как только .push()
вернет false, но затем преобразованиене имеет возможности прослушивать, когда нижестоящий поток хочет возобновить чтение, кроме как переопределить this.read
;но как бы это выглядело и есть ли опасность для этого?
Вот рабочий пример, с которым вы можете поиграть.
const stream = require('stream');
// a string large enough to overflow the buffer
const S_OVERFLOW = '-'.repeat((new stream.Readable()).readableHighWaterMark+1);
class example extends stream.Transform {
constructor() {
super({
writableObjectMode: true,
});
// some internal queue that will be emptied once writable side ends
Object.assign(this, {
internal_queue: [],
});
}
_transform(g_chunk, s_encoding, fk_transform) {
// store chunk in internal queue
this.internal_queue.push(g_chunk);
// done with transform (no writes)
fk_transform();
}
_flush(fk_flush) {
console.warn('starting to flush');
// now that writable side has ended, flush internal queue
this.resumeFlush(fk_flush);
}
resumeFlush(fk_flush) {
let a_queue = this.internal_queue;
// still data left in internal queue
while(a_queue.length) {
// remove an item from queue
a_queue.pop();
// intentionally overflow buffer
if(!this.push(S_OVERFLOW)) {
//
// WHAT TO DO HERE?
//
// go asynchronous
return;
}
}
console.warn('finished flush');
// callback
fk_flush();
}
}
// instantiate transform
let ds_transform = new example();
// pipe to stdout
ds_transform.pipe(process.stdout);
// write some data (needs to happen twice)
ds_transform.write({
item: 0,
});
ds_transform.write({
item: 1,
});
// end stream
ds_transform.end();
Передача stdout в /dev/null
, чтобы stderr все ещевывод на консоль:
$ node transform.js > /dev/null
starting to flush