Как правильно справиться с обратным давлением во время `Transform # flush` - PullRequest
0 голосов
/ 02 января 2019

Как правильно обрабатывать противодавление в реализации Transform метода _flush?Другими словами, если .push() возвращает false во время очистки, существуют ли какие-либо механизмы для правильной обработки обратного давления из нисходящего потока?

Документация предписывает прекратить нажатие, как только .push() вернет false, но затем преобразованиене имеет возможности прослушивать, когда нижестоящий поток хочет возобновить чтение, кроме как переопределить this.read;но как бы это выглядело и есть ли опасность для этого?

Вот рабочий пример, с которым вы можете поиграть.

const stream = require('stream');

// a string large enough to overflow the buffer
const S_OVERFLOW = '-'.repeat((new stream.Readable()).readableHighWaterMark+1);


class example extends stream.Transform {
    constructor() {
        super({
            writableObjectMode: true,
        });

        // some internal queue that will be emptied once writable side ends
        Object.assign(this, {
            internal_queue: [],
        });
    }

    _transform(g_chunk, s_encoding, fk_transform) {
        // store chunk in internal queue
        this.internal_queue.push(g_chunk);

        // done with transform (no writes)
        fk_transform();
    }

    _flush(fk_flush) {
        console.warn('starting to flush');

        // now that writable side has ended, flush internal queue
        this.resumeFlush(fk_flush);
    }

    resumeFlush(fk_flush) {
        let a_queue = this.internal_queue;

        // still data left in internal queue
        while(a_queue.length) {
            // remove an item from queue
            a_queue.pop();

            // intentionally overflow buffer
            if(!this.push(S_OVERFLOW)) {
                //
                // WHAT TO DO HERE?
                //

                // go asynchronous
                return;
            }
        }

        console.warn('finished flush');

        // callback
        fk_flush();
    }
}


// instantiate transform
let ds_transform = new example();

// pipe to stdout
ds_transform.pipe(process.stdout);

// write some data (needs to happen twice)
ds_transform.write({
    item: 0,
});

ds_transform.write({
    item: 1,
});

// end stream
ds_transform.end();

Передача stdout в /dev/null, чтобы stderr все ещевывод на консоль:

$ node transform.js > /dev/null
starting to flush

1 Ответ

0 голосов
/ 05 января 2019

Настоящая проблема здесь в том, что вы должны использовать дуплекс, а не преобразование. Поскольку каждый вызов _transform фактически буферизует данные, а не применяет к ним какое-либо (а) синхронное преобразование, этот тип реализации лучше подходит в качестве дуплекса, в результате чего выполняется вызов _write() данных буфера и вызовов * 1003. * Нажимайте до тех пор, пока не будет обнаружено противодавление.

const stream = require('stream');

// a string large enough to overflow the buffer
const S_OVERFLOW = '-'.repeat((new stream.Readable()).readableHighWaterMark+1);


class example extends stream.Duplex {
    constructor() {
        super({
            writableObjectMode: true,
        });

        // some internal queue that will be emptied once writable side ends
        Object.assign(this, {
            internal_queue: [],
        });
    }

    _write(g_chunk, s_encoding, fk_write) {
        // store chunk in internal queue
        this.internal_queue.push(g_chunk);

        // done with transform (no writes)
        fk_write();
    }

    _read() {
        console.warn('called _read()');
        let a_queue = this.internal_queue;

        // still data left in internal queue
        while(a_queue.length) {
            // remove an item from queue
            a_queue.pop();

            // intentionally overflow buffer
            if(!this.push(S_OVERFLOW)) {
                // go asynchronous
                return;
            }
        }

        console.warn('finished reading');

        // nothing more to read
        this.push(null);
    }
}


// instantiate transform
let ds_transform = new example();

// pipe to stdout
ds_transform.pipe(process.stdout);

// write some data (needs to happen twice)
ds_transform.write({
    item: 0,
});

ds_transform.write({
    item: 1,
});

// end stream
ds_transform.end();

Тогда вы получите:

$ node duplex.js > /dev/null
called _read()
called _read()
called _read()
finished reading
...