узел, создающий объект потока, который уже имеет несколько потоков внутри - PullRequest
0 голосов
/ 22 сентября 2018

Я хотел бы создать поток Noj в Objetc для чтения и записи, который внутренне просто соединяет несколько потоковых объектов для обеспечения некоторого уровня абстракции для проекта.

Что было бы наиболее элегантнымспособ сделать это?

Вот пример кода, чтобы продемонстрировать, что я пытаюсь сделать.

function main(transformerType) {
    var readStream = new ReadStream();
    var writeStream = new WriteStream();
    var transformerStream;

    if (transformerType === "a") {
        transformerStream = getATransformer();
    }
    else {
        transformerStream = getDefaultTransformer();
    }

    readStream.pipe(transformerStream).pipe(writeStream);
}

function getDefaultTransfromer() {
    return new DefaultTransformerStream();
}

function getATransformer() {
    var xStream = new LibraryXStream();
    var yStream = new LibraryYStream();
    xStream.pipe(yStream);

    // This next line won't work
    // I want to return a stream-object that has a function pipe which calls pipe from yStream
    // and when getting piped to will pipe into xStream
    // sort of a wrapper that has xStream and yStream already connected
    return xxx?;
}

1 Ответ

0 голосов
/ 23 сентября 2018

Я создал класс TransformerWrapper, который, кажется, решает проблему, по крайней мере, в тестовом коде ниже.Конечно, в этом простом примере все эти потоки могут быть соединены вместе в одну линию.Однако цель состоит в том, чтобы найти способ вернуть несколько связанных потоков, которые выглядят как один объект потока, доступный для чтения / записи, в вызывающую функцию.

Если кто-то найдет что-то меньшее, я был бы признателен.

const stream = require("stream");

class TransformerWrapper extends stream.Transform {
    constructor (transformers) {
        super();
        var i = 1;

        this.firstTransformer = transformers[0];
        this.lastTransformer = transformers[transformers.length-1];

        this.lastTransformer.on("data", (chunk) => {
            this.push(chunk);
        });

        for (; i < transformers.length; i++) {
            transformers[i-1].pipe(transformers[i]);
        }
    }
    _write(chunk, encoding, callback) {
        this.firstTransformer.write(chunk, encoding, callback);
    }
}

class TransformerA extends stream.Transform {
    _transform(chunk, encoding, callback) {
        this.push(chunk + '_A');
        callback();
    }   
}

class TransformerB extends stream.Transform {
    _transform(chunk, encoding, callback) {
        this.push(chunk + '_B');
        callback();
    }   
}

class Writer extends stream.Writable {
    _write(chunk, encoding, callback) {
        console.log("writer", chunk.toString());
        callback();
    }   
}

var transformerA = new TransformerA();
var transformerB = new TransformerB();

var transformerWrapper = new TransformerWrapper([transformerA, transformerB]);
transformerWrapper.pipe(new Writer());
transformerWrapper.write("foo");
transformerWrapper.write("bar");
...