Извлечение двоичных значений из потока с низким потреблением памяти - PullRequest
7 голосов
/ 26 марта 2019

Я создаю сервер NodeJS с ExpressJS, который обрабатывает данные ( 50 КБ до > 100 МБ ), отправленные с помощью POST-запроса из приложения для настольного компьютера, подлежащего обработке и вернулся. Настольное приложение gzip сжимает данные перед отправкой (50 КБ становится 4 КБ).

Я хочу, чтобы сервер распаковывал данные, извлекал значения из данных (строки, целые числа, символы, массивы, json и т. Д.), Обрабатывал эти данные и затем отвечал обработанными данными.

Я начал с этого:

apiRoute.route("/convert").post(bodyParser.raw({limit: '100Mb'}), (req, res) =>{
    let outputData;
    //extract values from req.body Buffer and do math on them.
    //save processed data in outputData

    res.json({
        status: true,
        data: outputData
    });
});

Это работает, потому что body-parser распаковывает данные в буфер req.body, хранящийся в памяти. Это моя главная проблема ... использование памяти. Я не хочу хранить весь набор данных в памяти.


Чтобы решить эту проблему, я удалил body-parser и вместо этого направил поток запросов непосредственно в поток преобразования zlib:

apiRoute.route("/convert").post((req, res) =>{
    req.pipe(zlib.createGunzip());
});

Проблема в том, что я не знаю, как извлечь двоичные значения из потока.


Вот что я бы хотел сделать:

apiRoute.route("/convert").post((req, res) =>{
    let binaryStream = new stream.Transform();

    req
    .pipe(zlib.createGunzip())
    .pipe(binaryStream);

    let aValue = binaryStream.getBytes(20);//returns 20 bytes
    let bValue = binaryStream.getBytes(20000);//returns the next 20000 bytes
    //etc...

});

Однако я не знаю, как это сделать. Модули, такие как Dissolve , близки, однако они требуют, чтобы вы заранее настроили логику синтаксического анализа, а all захваченных значений сохраняются в памяти.

Кроме того, я не знаю, как реагировать с помощью outputData, не загружая все это в память.


Итак, мой вопрос, как мне ...

  • Асинхронно считывать данные из потока с моей собственной скоростью и извлекать значения в
  • Отправлять обработанные данные обратно в настольное приложение, не помещая их в память

1 Ответ

2 голосов
/ 29 марта 2019

Я решил свою проблему.Я не уверен на 100%, что это лучший способ сделать это, поэтому я открыт для предложений.

Я создал подкласс stream.Transform и реализовал метод _transform.Я обнаружил, что следующий блок данных получает ввод только при вызове обратного вызова _transform.Зная это, я сохранил эту функцию обратного вызова как свойство и вызываю ее только тогда, когда мне нужен следующий блок.

getBytes(size) - это метод, который получит указанное количество байтов из текущего блока (сохраненного каксвойство) и вызовите ранее сохраненный обратный вызов, если необходим следующий фрагмент.Это делается рекурсивно, чтобы учесть разные размеры блоков и различное количество запрошенных байтов.

Затем, используя сочетание асинхронных / ожидающих и обещаний, я смог сохранить весь этот процесс асинхронным (afaik) и обратным давлением.

const {Transform} = require('stream'),
events = require('events');

class ByteStream extends Transform{

    constructor(options){
        super(options);

        this.event_emitter = new events.EventEmitter();
        this.hasStarted = false;
        this.hasEnded = false;
        this.currentChunk;
        this.nextCallback;
        this.pos = 0;

        this.on('finish', ()=>{
            this.hasEnded = true;
            this.event_emitter.emit('chunkGrabbed');
        });
    }

    _transform(chunk, enc, callback){
        this.pos = 0;
        this.currentChunk = chunk;
        this.nextCallback = callback;

        if(!this.hasStarted){
            this.hasStarted = true;
            this.event_emitter.emit('started');
        }
        else{
            this.event_emitter.emit('chunkGrabbed');
        }
    }

    doNextCallback(){
        return new Promise((resolve, reject) =>{
            this.event_emitter.once('chunkGrabbed', ()=>{resolve();});
            this.nextCallback();
        });
    }

    async getBytes(size){
        if(this.pos + size > this.currentChunk.length)
        {
            let bytes = this.currentChunk.slice(this.pos, this.currentChunk.length);

            if(!this.hasEnded)
            {
                var newSize = size-(this.currentChunk.length - this.pos);
                //grab next chunk
                await this.doNextCallback();
                if(!this.hasEnded){
                    this.pos = 0;
                    let recurseBytes; await this.getBytes(newSize).then(bytes => {recurseBytes = bytes;});
                    bytes = Buffer.concat([bytes, recurseBytes]);
                }
            }

            return bytes;
        }
        else{
            let bytes = this.currentChunk.slice(this.pos, this.pos+size);
            this.pos += size;
            return bytes;
        }
    }
}

module.exports = {
    ByteStream : ByteStream 
}

Мой экспресс-маршрут теперь:

apiRoute.route("/convert").post((req, res)=>{

    let bStream = new ByteStream({});
    let gStream = zlib.createGunzip();

    bStream event_emitter.on('started', async () => {
        console.log("started!");

        let myValue; await bStream.getBytes(60000).then(bytes => {myValue = bytes});
        console.log(myValue.length);
    });

    req
    .pipe(gStream)
    .pipe(bStream);
});

Проверяя событие started, я могу узнать, когда первый чанк был передан в bStream.Оттуда, это просто вопрос вызова getBytes() с моим желаемым количеством байтов, а затем присвоение обещанного значения переменной.Он делает только то, что мне нужно, хотя я еще не проводил тщательных испытаний.

...