Написание больших файлов с Node.js - PullRequest
24 голосов
/ 28 февраля 2012

Я пишу большой файл с помощью node.js, используя записываемый поток :

var fs     = require('fs');
var stream = fs.createWriteStream('someFile.txt', { flags : 'w' });

var lines;
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        stream.write( lines[i] );
    }
}

Мне интересно, безопасна ли эта схема без использования события drain? Если это не так (как мне кажется, это так), какова схема записи произвольных больших данных в файл?

Ответы [ 7 ]

19 голосов
/ 08 марта 2012

Вот так я наконец и сделал.Идея состоит в том, чтобы создать читаемый поток, реализующий интерфейс ReadStream , а затем использовать метод pipe() для передачи данных в поток для записи.

var fs = require('fs');
var writeStream = fs.createWriteStream('someFile.txt', { flags : 'w' });
var readStream = new MyReadStream();

readStream.pipe(writeStream);
writeStream.on('close', function () {
    console.log('All done!');
});

Можно взять пример класса MyReadStreamот мангуста QueryStream .

11 голосов
/ 28 февраля 2012

Идея, стоящая за стоком, заключается в том, что вы можете использовать его для проверки здесь:

var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});

var lines;
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        stream.write(lines[i]); //<-- the place to test
    }
}

, которым вы не являетесь.Таким образом, вам нужно будет перестроить архитектуру, чтобы сделать ее «входящей».

var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});

var lines;
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        var written = stream.write(lines[i]); //<-- the place to test
        if (!written){
           //do something here to wait till you can safely write again
           //this means prepare a buffer and wait till you can come back to finish
           //  lines[i] -> remainder
        }
    }
}

Однако означает ли это, что вам нужно также сохранять буферизацию getLines во время ожидания?

var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});

var lines,
    buffer = {
     remainingLines = []
    };
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        var written = stream.write(lines[i]); //<-- the place to test
        if (!written){
           //do something here to wait till you can safely write again
           //this means prepare a buffer and wait till you can come back to finish
           //  lines[i] -> remainder
           buffer.remainingLines = lines.slice(i);
           break;
           //notice there's no way to re-run this once we leave here.
        }
    }
}

stream.on('drain',function(){
  if (buffer.remainingLines.length){
    for (var i = 0; i < buffer.remainingLines.length; i++) {
      var written = stream.write(buffer.remainingLines[i]); //<-- the place to test
      if (!written){
       //do something here to wait till you can safely write again
       //this means prepare a buffer and wait till you can come back to finish
       //  lines[i] -> remainder
       buffer.remainingLines = lines.slice(i);
      }
    }
  }
});
3 голосов
/ 05 сентября 2013

Самый простой способ справиться с этим - сделать ваш генератор строк читаемым потоком - назовем его lineReader.Затем следующее автоматически обработает буферы и удалит их для вас:

lineReader.pipe(fs.createWriteStream('someFile.txt'));

Если вы не хотите создавать читаемый поток, вы можете прослушать вывод write для заполнения буфера иответьте так:

var i = 0, n = lines.length;
function write () {
  if (i === n) return;  // A callback could go here to know when it's done.
  while (stream.write(lines[i++]) && i < n);
  stream.once('drain', write);
}
write();  // Initial call.

Более длинный пример этой ситуации можно найти здесь .

2 голосов
/ 06 декабря 2012

Я обнаружил, что потоки - это неэффективный способ работы с большими файлами - это потому, что вы не можете установить адекватный размер входного буфера (по крайней мере, я не знаю хорошего способа сделать это). Вот что я делаю:

var fs = require('fs');

var i = fs.openSync('input.txt', 'r');
var o = fs.openSync('output.txt', 'w');

var buf = new Buffer(1024 * 1024), len, prev = '';

while(len = fs.readSync(i, buf, 0, buf.length)) {

    var a = (prev + buf.toString('ascii', 0, len)).split('\n');
    prev = len === buf.length ? '\n' + a.splice(a.length - 1)[0] : '';

    var out = '';
    a.forEach(function(line) {

        if(!line)
            return;

        // do something with your line here

        out += line + '\n';
    });

    var bout = new Buffer(out, 'ascii');
    fs.writeSync(o, bout, 0, bout.length);
}

fs.closeSync(o);
fs.closeSync(i);
2 голосов
/ 28 февраля 2012

[Изменить] Обновленный Node.js writable.write(...) Документы API скажем:

[Возвращаемое] значение строго рекомендовано.Вы МОЖЕТЕ продолжать писать, даже если он возвращает false.Однако записи будут буферизироваться в памяти, поэтому лучше не делать это чрезмерно.Вместо этого дождитесь события стока, прежде чем записывать больше данных.

[Оригинал] Из stream.write(...) документации (выделено мной):

Возвращает true, если строка была сброшена в буфер ядра.Возвращает false, чтобы указать, что буфер ядра заполнен, и данные будут отправлены в будущем .

Я понимаю, что это означает, что функция "write"возвращает true, если данная строка была немедленно записана в буфер основной ОС, или false, если она еще не была записана, но будет записана функцией записи (например, предположительно была буферизована для вас WriteStream) чтобы вам больше не приходилось звонить «пишите».

1 голос
/ 22 февраля 2018

Если у вас нет входного потока, вы не можете легко использовать pipe. Ничто из вышеперечисленного не сработало для меня, событие слива не срабатывает. Решается следующим образом (на основании ответа Тайлера):

var lines[]; // some very large array
var i = 0;

function write() {
    if (i < lines.length)  {
        wstream.write(lines[i]), function(err){
            if (err) {
                console.log(err);
            } else {
                i++;
                write();
            }
        });
    } else {
        wstream.end();
        console.log("done");
    }
};
write();
1 голос
/ 12 июля 2014

Несколько предложенных ответов на этот вопрос вообще упустили пункт о потоках.

Этот модуль может помочь https://www.npmjs.org/package/JSONStream

Однако, давайте предположим ситуацию, как описано, и напишем код сами.Вы читаете из MongoDB как поток, с ObjectMode = true по умолчанию.

Это приведет к проблемам, если вы попытаетесь выполнить прямой поток в файл - что-то вроде ошибки «Invalid non-string / buffer chunk».

Решение этого типа проблемы очень простое.

Просто поместите еще одно преобразование между читаемым и записываемым, чтобы адаптировать объект, читаемый, в строку, подходящую для записи.

Пример решения кода:

var fs = require('fs'),
    writeStream = fs.createWriteStream('./out' + process.pid, {flags: 'w', encoding: 'utf-8' }),
    stream = require('stream'),
    stringifier = new stream.Transform();
stringifier._writableState.objectMode = true;
stringifier._transform = function (data, encoding, done) {
    this.push(JSON.stringify(data));
    this.push('\n');
    done();
}
rowFeedDao.getRowFeedsStream(merchantId, jobId)
.pipe(stringifier)
.pipe(writeStream).on('error', function (err) {
   // handle error condition
}
...