Причина вашей ошибки проста и не проста. Основная проблема заключается в том, что прослушиватель data
не ожидается в node.js отправителе событий - так что вы можете ожидать, что обработка данных начинается по порядку, то время, когда оно завершается нет. Теперь, учитывая это, убедитесь, что событие end
запускается сразу после последнего события data
, и, следовательно, запись некоторых из последних обработанных элементов выполняется после закрытия потока записи ... и да, я вас знаю сделал паузу, но в async function
вы сделали это после того, как синхронный материал был обработан узлом, а end
является одним из.
Что вы могли бы сделать, это реализовать поток Transform, но это, учитывая ваше использование дело может быть намного сложнее, чем просто использование другого модуля - например, мой scramjet
, который позволит вам запускать асинхронный код в фильтрах данных.
const AWS = require('aws-sdk');
const utils = require('./utils');
const {StringStream} = require('scramjet');
const s3 = new AWS.S3();
exports.handler = async (event) => {
console.log("Incoming Event: ", JSON.stringify(event));
const bucket = event.Records[0].s3.bucket.name;
const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
const message = `File is uploaded in - ${bucket} -> ${filename}`;
console.log(message);
const splittedFilename = filename.split('.');
const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1];
const reportBucket = 'external.transactions.reports';
var request = s3.getObject({ Bucket: bucket, Key: filename });
var stream = StringStream
// create a StringStream from a scramjet stream
.from(
request.createReadStream()
)
// then just parse the data
.CSVParse({headers: true})
// then filter using asynchronous function as if it was an array
.filter(async data => {
var response = await utils.filterLogic(data);
return response.statusCode === 200;
})
// then stringify
.CSVStringify({headers: true})
// then upload
.pipe(
utils.uploadFromStream(s3, reportBucket, reportFilename)
);
return new Promise(
(res, rej) => stream
.on("finish", res)
.on("error", rej)
);
};
Scramjet позаботится о создании потоковый конвейер из вышеперечисленных методов, поэтому вам не нужно приостанавливать / возобновлять - все об этом позаботятся.
Возможно, вы захотите прочитать:
О, и scramjet добавляет только 3 deps, поэтому ваши модули узлов выиграли ' быть в этом случае lativisti c шутка. ;)