В следующем фрагменте кода этот код здесь
lambda.invoke({
FunctionName: 'orderProcessor',
InvocationType: "RequestResponse",
Payload: JSON.stringify(message)
}, function (err, result) {
if (err) {
console.error(err);
csvreadstream.resume();
} else {
var response = JSON.parse(result.Payload);
if(response.statusCode != 200) {
console.log("RESULT: " + JSON.stringify(data));
writestream.write(data);
csvreadstream.resume();
} else {
csvreadstream.resume();
}
}
});
вызывается только один раз с ошибкой
"errorType": "Error",
"errorMessage": "write after end",
"code": "ERR_STREAM_WRITE_AFTER_END",
"message": "write after end",
, хотя я использую pause () и resume ().
Делает у кого есть идеи как решить? Большое спасибо!
const AWS = require('aws-sdk');
const utils = require('./utils');
const lambda = new AWS.Lambda();
const csv = require('fast-csv');
const parse = require('csv-parser');
const s3 = new AWS.S3();
const APIGATEWAY = new AWS.APIGateway();
const APIKEY = "xxxxxxx";
exports.handler = (event) => {
console.log("Incoming Event: ", event);
const bucket = event.Records[0].s3.bucket.name;
const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
const message = `File is uploaded in - ${bucket} -> ${filename}`;
console.log(message);
var csvreadstream = s3.getObject({ Bucket: bucket, Key: filename }).createReadStream();
var params = {
apiKey: APIKEY,
includeValue: true || false
};
var apiKey = APIGATEWAY.getApiKey(params).promise().then(function (apiKey) {
console.log(apiKey);
var httpsHeaders = {
"Content-Type": "application/json",
"x-api-key": apiKey.value
};
var httpsMethod = "POST";
const splittedFilename = filename.split('.');
const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1];
const reportBucket = 'external.transactions.reports';
const writestream = csv.format({ headers: true });
writestream
.pipe(utils.uploadFromStream(s3, reportBucket, reportFilename))
.on("end", function () {
console.log("Report written to S3 " + reportFilename);
});
var messagecounter = 0;
csvreadstream.pipe(parse())
.on('data', (data) => {
console.log(JSON.stringify(data));
csvreadstream.pause();
if (data.clientId != ''
&& data.ordeId != ''
&& data.accountId != ''
&& data.type != '') {
messagecounter += 1;
var message = {
"method": httpsMethod,
"headers": httpsHeaders,
"body": {
"type": "fileupload",
"ordeId": data.ordeId,
"accountId": data.accountId,
"accountId": data.accountId,
"openTS": Number.parseInt(data.openTS),
"closeTS": Number.parseInt(data.closeTS),
"orderType": data.orderType,
"orderValue": Number.parseFloat(data.orderValue),
}
};
message.method = "POST";
lambda.invoke({
FunctionName: 'orderProcessor',
InvocationType: "RequestResponse",
Payload: JSON.stringify(message)
}, function (err, result) {
if (err) {
console.error(err);
csvreadstream.resume();
} else {
var response = JSON.parse(result.Payload);
if(response.statusCode != 200) {
console.log("RESULT: " + JSON.stringify(data));
writestream.write(data);
csvreadstream.resume();
} else {
csvreadstream.resume();
}
}
});
}
csvreadstream.resume();
})
.on('end', () => {
console.log("File upload of file" + filename + " completed. " + messagecounter + " records processed.");
writestream.end();
});
});
};
Вот новая версия:
const AWS = require('aws-sdk');
const utils = require('./utils');
const lambda = new AWS.Lambda();
const EXTERNAL_TRANSACTIONS_URL = 'https://sqs.us-east-1.amazonaws.com/nnnnnnnnnn/External_Transactions';
const csv = require('fast-csv');
const s3 = new AWS.S3();
const APIGATEWAY = new AWS.APIGateway();
const APIKEYSAPHIRSTEIN = "xxxxxxxxxxx";
exports.handler = (event) => {
console.log("Incoming Event: ", event);
const bucket = event.Records[0].s3.bucket.name;
const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
const message = `File is uploaded in - ${bucket} -> ${filename}`;
console.log(message);
var params = {
apiKey: APIKEYSAPHIRSTEIN,
includeValue: true || false
};
var apiKey = APIGATEWAY.getApiKey(params).promise().then(function (apiKey) {
console.log(apiKey);
var httpsHeaders = {
"Content-Type": "application/json",
"x-api-key": apiKey.value
};
var httpsMethod = "POST";
const splittedFilename = filename.split('.');
const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1];
const reportBucket = 'external.transactions.reports';
var csvreadstream = s3.getObject({ Bucket: bucket, Key: filename }).createReadStream();
const csvwritestream = csv.format({ headers: true });
csvwritestream
.pipe(utils.uploadFromStream(s3, reportBucket, reportFilename))
.on("end", function () {
console.log("Report written to S3 " + reportFilename);
});
var messagecounter = 0;
csvreadstream
.pipe(csv.parse({ headers: true }))
.on('data', function (data) {
csvreadstream.pause();
console.log(JSON.stringify(data));
if (data.mandateId != ''
&& data.orderId != ''
&& data.accountId != ''
&& data.orderType != '') {
messagecounter += 1;
var message = {
"method": httpsMethod,
"headers": httpsHeaders,
"body": {
"type": "fileupload",
"mandateId": data.mandateId,
"orderId": data.orderId,
"accountId": data.accountId,
"openTS": Number.parseInt(data.openTS),
"closeTS": Number.parseInt(data.closeTS),
"orderType": data.orderType,
"orderValue": Number.parseFloat(data.orderValue),
}
};
message.method = "POST";
console.log("Message " + messagecounter + ": " + JSON.stringify(message));
lambda.invoke({
FunctionName: 'transactionProcessor',
InvocationType: "RequestResponse",
Payload: JSON.stringify(message)
}).promise().then(function(result){
var response = JSON.parse(result.Payload);
if (response.statusCode != 200) {
console.log("RESULT: " + JSON.stringify(data));
csvwritestream.write(data);
}
})
.catch(function(error) {
console.error(error);
})
.then(function() {
console.log("INVOKE request finished");
})
}
})
.on('error', function (error) {
console.log("error");
})
.on('end', function () {
console.log("end");
csvwritestream.end();
});
});
};
lambda.invoke выполняется синхронно, однако при выполнении csvwritestream.write(data);
появляется ошибка Error [ERR_STREAM_WRITE_AFTER_END]: write after end
, которая означает, что csvwritestream.end()
в .on('end')
уже выполнено .