Узел - Правильно закрывать потоки после конвейера - PullRequest
0 голосов
/ 07 апреля 2020

Допустим, у меня есть следующий код:

try {
    let size = 0;

    await pipeline(
        fs.createReadStream('lowercase.txt'),
        async function* (source) {
            for await (const chunk of source) {
                size += chunk.length;

                if (size >= 1000000) {
                    throw new Error('File is too big');
                }

                yield String(chunk).toUpperCase();
            }
        },
        fs.createWriteStream('uppercase.txt')
    );

    console.log('Pipeline succeeded.');
} catch (error) {
    console.log('got error:', error);
}

Как мне убедиться, что я правильно закрываю потоки в каждом отдельном случае? Документы узла не сильно помогают - они просто говорят мне, что у меня будут висячие слушатели событий:

stream.pipeline () вызовет stream.destroy ( err) во всех потоках, кроме:

Доступные для чтения потоки, которые отправили 'end' или 'close'.

Доступные для записи потоки, которые отправили 'fini sh' или 'close'.

stream.pipeline () оставляет слушатели висячих событий в потоках после вызова обратного вызова. В случае повторного использования потоков после сбоя это может привести к утечкам прослушивателя событий и ошибкам проглатывания.

1 Ответ

1 голос
/ 08 апреля 2020

Итак, я нахожу, что многие из node.js составных операций потока, таких как pipeline() и .pipe(), действительно плохие / неполные при обработке ошибок. Например, если вы просто сделаете это:

fs.createReadStream("input.txt")
  .pipe(fs.createWriteStream("output.txt"))
  .on('error', err => {
      console.log(err);
  }).on('finish', () => {
      console.log("all done");
  });

Можно ожидать, что если при открытии readStream произошла ошибка, вы получите эту ошибку в своем обработчике ошибок здесь, но «нет», это не так. дело. Ошибка открытия этого входного файла будет обработана. В этом есть некоторая логика c, так как .pipe() возвращает выходной поток, и ошибка ввода не является ошибкой в ​​выходном потоке, но когда она не передается, очень легко пропустить ошибки во входном потоке. Операция .pipe() могла бы прослушивать ошибки во входном потоке и передавать ошибку через нее (даже если это была pipeErr или что-то другое), а затем она могла бы также правильно очистить writeStream при ошибке чтения. Но .pipe() не был реализован так тщательно. Кажется, нужно предположить, что во входном потоке никогда не будет ошибки.

Вместо этого вам нужно отдельно сохранить объект readStream и напрямую прикрепить к нему обработчик ошибок, чтобы увидеть эту ошибку. Поэтому я просто больше не доверяю этим составным вещам, и do c никогда не объясняет, как правильно обрабатывать ошибки. Я попытался взглянуть на код для pipeline(), чтобы понять, могу ли я понять обработку ошибок, и это не оказалось плодотворным занятием.

Итак, ваша конкретная проблема, кажется, может быть решена с помощью transform stream:

const fs = require('fs');
const { Transform } = require('stream');

const myTransform = new Transform({
    transform: function(chunk, encoding, callback) {
        let str = chunk.toString('utf8');
        this.push(str.toUpperCase());
        callback();
    }
});

function upperFile(input, output) {
    return new Promise((resolve, reject) => {
        // common function for cleaning up a partial output file
        function errCleanup(err) {
            fs.unlink(output, function(e) {
                if (e) console.log(e);
                reject(err);
            });
        }

        let inputStream = fs.createReadStream(input, {encoding: 'utf8'});
        let outputStream = fs.createWriteStream(output, {emitClose: true});

        // have to separately listen for read/open errors
        inputStream.on("error", err => {
            // have to manually close writeStream when there was an error reading
            if (outputStream) outputStream.destroy();
            errCleanup(err);
        });
        inputStream.pipe(myTransform)
            .pipe(outputStream)
            .on("error", errCleanup)
            .on("close", resolve);        
    });
}

// sample usage
upperFile("input.txt", "output.txt").then(() => {
    console.log("all done");
}).catch(err => {
    console.log("got error", err);
});

Как вы можете видеть, примерно 2/3 этого кода надежно обрабатывают ошибки (часть, которую встроенные операции не выполняют должным образом).

...