Node.js закройте записывающий поток в конце чтения - PullRequest
1 голос
/ 17 марта 2020

Я читаю файл CSV, хранящийся на AWS S3. Эти записи CSV оцениваются с помощью функции filterLogi c (). Те записи, которые не прошли тест, также должны быть записаны в CSV-файл отчета об ошибках на AWS S3. Для анализа csv я использую fast-csv.

Однако для последней записи я получаю "Error [ERR_STREAM_WRITE_AFTER_END]: write after end"

Где и как мне нужно правильно вызвать csvwritestream.end()?

const AWS = require('aws-sdk');
const utils = require('./utils');
const csv = require('fast-csv');
const s3 = new AWS.S3();

exports.handler = async (event) => {
    console.log("Incoming Event: ", JSON.stringify(event));
    const bucket = event.Records[0].s3.bucket.name;
    const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
    const message = `File is uploaded in - ${bucket} -> ${filename}`;
    console.log(message);

    const splittedFilename = filename.split('.');
    const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1];
    const reportBucket = 'external.transactions.reports';

    const csvwritestream = csv.format({ headers: true });
    csvwritestream
        .pipe(utils.uploadFromStream(s3, reportBucket, reportFilename))
        .on('end', function () {
            console.log("Report written to S3 " + reportFilename);
        });

    var request = s3.getObject({ Bucket: bucket, Key: filename });
    var stream = request.createReadStream({ objectMode: true })
        .pipe(csv.parse({ headers: true }))
        .on('data', async function (data) {
            stream.pause();
            console.log("JSON: " + JSON.stringify(data));
            var response = await utils.filterLogic(data);
            if (response.statusCode !== 200) {
                await csvwritestream.write(data);
                console.log("Data: " + JSON.stringify(data) + " written."); 
            }
            stream.resume();
        })
        .on('end', function(){
            csvwritestream.end();   
        });
    return new Promise(resolve => {
        stream.on('close', async function () {
            csvwritestream.end();
            resolve();
        });
    });
};

1 Ответ

0 голосов
/ 02 апреля 2020

Причина вашей ошибки проста и не проста. Основная проблема заключается в том, что прослушиватель data не ожидается в node.js отправителе событий - так что вы можете ожидать, что обработка данных начинается по порядку, то время, когда оно завершается нет. Теперь, учитывая это, убедитесь, что событие end запускается сразу после последнего события data, и, следовательно, запись некоторых из последних обработанных элементов выполняется после закрытия потока записи ... и да, я вас знаю сделал паузу, но в async function вы сделали это после того, как синхронный материал был обработан узлом, а end является одним из.

Что вы могли бы сделать, это реализовать поток Transform, но это, учитывая ваше использование дело может быть намного сложнее, чем просто использование другого модуля - например, мой scramjet, который позволит вам запускать асинхронный код в фильтрах данных.

const AWS = require('aws-sdk');
const utils = require('./utils');
const {StringStream} = require('scramjet');
const s3 = new AWS.S3();

exports.handler = async (event) => {
    console.log("Incoming Event: ", JSON.stringify(event));
    const bucket = event.Records[0].s3.bucket.name;
    const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
    const message = `File is uploaded in - ${bucket} -> ${filename}`;
    console.log(message);

    const splittedFilename = filename.split('.');
    const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1];
    const reportBucket = 'external.transactions.reports';

    var request = s3.getObject({ Bucket: bucket, Key: filename });
    var stream = StringStream
        // create a StringStream from a scramjet stream
        .from(
            request.createReadStream()
        )
        // then just parse the data
        .CSVParse({headers: true})
        // then filter using asynchronous function as if it was an array
        .filter(async data => {
            var response = await utils.filterLogic(data);
            return response.statusCode === 200;
        })
        // then stringify
        .CSVStringify({headers: true})
        // then upload
        .pipe(
            utils.uploadFromStream(s3, reportBucket, reportFilename)
        );

    return new Promise(
        (res, rej) => stream
            .on("finish", res)
            .on("error", rej)
    );
};

Scramjet позаботится о создании потоковый конвейер из вышеперечисленных методов, поэтому вам не нужно приостанавливать / возобновлять - все об этом позаботятся.

Возможно, вы захотите прочитать:

О, и scramjet добавляет только 3 deps, поэтому ваши модули узлов выиграли ' быть в этом случае lativisti c шутка. ;)

...