Данные не преобразуются Node.js Преобразование потоков - PullRequest
0 голосов
/ 26 мая 2018

Я пытаюсь создать поток преобразования, который получает данные из socket.io, преобразовывает их в JSON и затем отправляет в stdout.Я полностью озадачен тем, почему данные просто проходят без каких-либо преобразований.Я использую библиотеку through2.Вот мой код:

getStreamNames().then(streamNames => {
        const socket = io(SOCKETIO_URL);
        socket.on('connect', () => {
            socket.emit('Subscribe', {subs: streamNames});
        });

        const stream = through2.obj(function (chunk, enc, callback) {
            callback(null, parseString(chunk))
        }).pipe(through2.obj(function (chunk, enc, callback) {
            callback(null, JSON.stringify(chunk));
        })).pipe(process.stdout);

        socket.on('m', data => stream.write(data));

    },
);

getStreamNames возвращает обещание, которое разрешается в массив имен потоков (я вызываю внешний socket.io API), а parseString принимает строку, возвращаемую изAPI и преобразует его в JSON, чтобы он был управляемым.

Мне нужна моя консоль для распечатки JSON stringify'd после того, как я проанализирую его с помощью parseString, а затем сделаю его доступным для stdout с помощью JSON.stringify.Что на самом деле происходит, так это то, что данные проходят через поток и не преобразуются.

Для справки, данные, поступающие из API, имеют странный формат, что-то вроде

field1~field2~0x23~fieldn

и поэтому мне нужен метод parseString.

Я, должно быть, что-то упустил.Есть идеи?

РЕДАКТИРОВАТЬ:

parseString:

function(value) {
    var valuesArray = value.split("~");
    var valuesArrayLenght = valuesArray.length;
    var mask = valuesArray[valuesArrayLenght - 1];
    var maskInt = parseInt(mask, 16);
    var unpackedCurrent = {};
    var currentField = 0;
    for (var property in this.FIELDS) {
        if (this.FIELDS[property] === 0) {
            unpackedCurrent[property] = valuesArray[currentField];
            currentField++;
        }
        else if (maskInt & this.FIELDS[property]) {
            if (property === 'LASTMARKET') {
                unpackedCurrent[property] = valuesArray[currentField];
            }
            else {
                unpackedCurrent[property] = parseFloat(valuesArray[currentField]);
            }
            currentField++;
        }
    }

    return unpackedCurrent;
};

Спасибо

1 Ответ

0 голосов
/ 26 мая 2018

Проблема в том, что поток, который вы пишете, на самом деле process.stdout, потому что .pipe возвращает последний stream.Writable, так что вы можете продолжать цепочку, в вашем случае, process.stdout.

const x = stream.pipe(stream2).pipe(stream3).pipe(process.stdout);
x === process.stdout // true

Итак, все, что вы делали, это: process.stdout.write(data) без прохождения конвейера.

Что вам нужно сделать, это присвоить свой первый поток through2 переменной stream, а затем .pipe в этом потоке.

const stream = through2.obj((chunk, enc, callback) => {
    callback(null, parseString(chunk))
});

stream
    .pipe(through2.obj((chunk, enc, callback) => {
        callback(null, JSON.stringify(chunk));
    }))
    .pipe(process.stdout);

socket.on('m', data => stream.write(data));
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...