node.js: создайте связанную пару записываемых и читаемых потоков - PullRequest
0 голосов
/ 22 сентября 2018

Я пытаюсь создать функцию, которая возвращает подключенную пару записываемых и читаемых потоков.Например:

const { writable, readable } = createStreamPair();

, где каждый конец имеет правильный интерфейс (writable instanceof stream.Readable === false и readable instanceof stream.Writable === false) в отличие от потока PassThrough.

вариант использования:

createWriteStream(filePath) {

    const { writable, readable } = createStreamPair();
    writeFile(filePath, readable);
    return writable;
}

Как создать мою createStreamPair() функцию?

Edit1

Наивный подход, который, очевидно, не работает ...

function createStreamPair() {

    var readable = new stream.Readable();
    var writable = new stream.Writable();
    readable.pipe(writable);
    return { writable, readable }
}

1 Ответ

0 голосов
/ 13 марта 2019

В тестах Node.js используется функция, которая создает два дуплексных потока, запись в один может считываться из другого, и наоборот: https://github.com/nodejs/node/blob/master/test/common/duplexpair.js

Он не является частью узла.js стандартная библиотека, но вы можете написать свою собственную.

Я представлю слегка измененную аннотированную версию здесь:

const Duplex = require('stream').Duplex;
const assert = require('assert');

// Define some unique property names.
// The actual value doesn't matter,
// so long as they're not used by Node.js for anything else.
const kCallback = Symbol('Callback');
const kOtherSide = Symbol('Other');

// Define a function `DuplexSocket` whose prototype inherits from `Duplex`
class DuplexSocket extends Duplex {
    constructor() {
        // Let Node.js initialize everything it needs to
        super();
        // Define two values we will be using
        // kCallback saves a temporary reference to a function while
        this[kCallback] = null;
        // kOtherSide will be the reference to the other side of the stream
        this[kOtherSide] = null;
    }

    _read() {
        // This is called when this side receives a push() call
        // If the other side set a callback for us to call,
        // then first clear that reference
        // (it might be immediately set to a new value again),
        // then call the function.
        const callback = this[kCallback];
        if (callback) {
            this[kCallback] = null;
            callback();
        }
    }

    _write(chunk, encoding, callback) {
        // This is called when someone writes to the stream
        // Ensure there's a reference to the other side before trying to call it
        assert.notStrictEqual(this[kOtherSide], null);
        // Ensure that the other-side callback is empty before setting it
        // If push immediately calls _read, this should never be a problem
        assert.strictEqual(this[kOtherSide][kCallback], null);
        // Let Node.js know when _read has been called
        this[kOtherSide][kCallback] = callback;
        // And finally, send the other side the data to be read
        this[kOtherSide].push(chunk);
    }

    _final(callback) {
        // Ask the other side to let us know it received our EOF request
        this[kOtherSide].on('end', callback);
        // And finally, pushing null signals the end of the stream
        this[kOtherSide].push(null);
    }
}

function makeDuplexPair() {
    // Create two pairs of 
    const clientSide = new DuplexSocket();
    const serverSide = new DuplexSocket();
    // Set the other-side reference
    clientSide[kOtherSide] = serverSide;
    serverSide[kOtherSide] = clientSide;
    // Both instances behave the same, so choice of name doesn't matter,
    // So long as they're distinguishable.
    return { clientSide, serverSide };
}

module.exports = makeDuplexPair;

Вот еще один способ создания двух потоков, один для чтения и одинЗапись в этом случае:

function makeAsymmetricalStreamPair() {
    var readableCallback;
    const readableSide = new ReadableStream;
    readableSide._read = function _read(){
        if(!readableCallback) return;
        var callback = readableCallback;
        readableCallback = null;
        callback();
    }
    const writableSide = new WritableStream;
    writableSide._write = function _write(chunk, enc, callback){
        if(readableCallback) throw new Error;
        if(typeof callback==='function') readableCallback = callback;
        readableSide.push(chunk);
    }
    writableSide._final = function _final(callback){
        readableSide.on('end', callback);
        readableSide.push(null);
    }
    return { readableSide, writableSide };
}
...