Node.js чтение потока csv не приостанавливается () - PullRequest
0 голосов
/ 05 марта 2020

В следующем фрагменте кода этот код здесь

                        lambda.invoke({
                            FunctionName: 'orderProcessor',
                            InvocationType: "RequestResponse",
                            Payload: JSON.stringify(message)
                        }, function (err, result) {
                            if (err) {
                                console.error(err);
                                csvreadstream.resume();
                            } else {
                                var response = JSON.parse(result.Payload);
                                if(response.statusCode != 200) {
                                    console.log("RESULT: " + JSON.stringify(data));
                                    writestream.write(data);
                                    csvreadstream.resume();
                                } else {
                                    csvreadstream.resume();
                                }
                            }
                        });

вызывается только один раз с ошибкой

"errorType": "Error",
"errorMessage": "write after end",
"code": "ERR_STREAM_WRITE_AFTER_END",
"message": "write after end",

, хотя я использую pause () и resume ().

Делает у кого есть идеи как решить? Большое спасибо!

const AWS = require('aws-sdk');
const utils = require('./utils');
const lambda = new AWS.Lambda();
const csv = require('fast-csv');
const parse = require('csv-parser');
const s3 = new AWS.S3();
const APIGATEWAY = new AWS.APIGateway();
const APIKEY = "xxxxxxx";


exports.handler = (event) => {

    console.log("Incoming Event: ", event);
    const bucket = event.Records[0].s3.bucket.name;
    const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
    const message = `File is uploaded in - ${bucket} -> ${filename}`;
    console.log(message);

    var csvreadstream = s3.getObject({ Bucket: bucket, Key: filename }).createReadStream();

    var params = {
        apiKey: APIKEY,
        includeValue: true || false
    };
    var apiKey = APIGATEWAY.getApiKey(params).promise().then(function (apiKey) {
        console.log(apiKey);
        var httpsHeaders = {
            "Content-Type": "application/json",
            "x-api-key": apiKey.value
        };
        var httpsMethod = "POST";

        const splittedFilename = filename.split('.');
        const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1];
        const reportBucket = 'external.transactions.reports';

        const writestream = csv.format({ headers: true });
        writestream
            .pipe(utils.uploadFromStream(s3, reportBucket, reportFilename))
            .on("end", function () {
                console.log("Report written to S3 " + reportFilename);
            });
        var messagecounter = 0;
        csvreadstream.pipe(parse())
                .on('data', (data) => {
                    console.log(JSON.stringify(data));
                    csvreadstream.pause();
                    if (data.clientId != ''
                    && data.ordeId != ''
                    && data.accountId != ''
                    && data.type != '') {
                        messagecounter += 1;
                        var message = {
                            "method": httpsMethod,
                            "headers": httpsHeaders,
                            "body": {
                                "type": "fileupload",
                                "ordeId": data.ordeId,
                                "accountId": data.accountId,
                                "accountId": data.accountId,
        
                                "openTS": Number.parseInt(data.openTS),
                                "closeTS": Number.parseInt(data.closeTS),
        
                                "orderType": data.orderType,
        
                                "orderValue": Number.parseFloat(data.orderValue),
                            }
                        };
                        message.method = "POST";
                        lambda.invoke({
                            FunctionName: 'orderProcessor',
                            InvocationType: "RequestResponse",
                            Payload: JSON.stringify(message)
                        }, function (err, result) {
                            if (err) {
                                console.error(err);
                                csvreadstream.resume();
                            } else {
                                var response = JSON.parse(result.Payload);
                                if(response.statusCode != 200) {
                                    console.log("RESULT: " + JSON.stringify(data));
                                    writestream.write(data);
                                    csvreadstream.resume();
                                } else {
                                    csvreadstream.resume();
                                }
                            }
                        });
                    }
                    csvreadstream.resume();
            })
            .on('end', () => {
                console.log("File upload of file" + filename + " completed. " + messagecounter + " records processed.");
                writestream.end();
            });
    });
};

Вот новая версия:

const AWS = require('aws-sdk');
const utils = require('./utils');
const lambda = new AWS.Lambda();
const EXTERNAL_TRANSACTIONS_URL = 'https://sqs.us-east-1.amazonaws.com/nnnnnnnnnn/External_Transactions';
const csv = require('fast-csv');
const s3 = new AWS.S3();
const APIGATEWAY = new AWS.APIGateway();
const APIKEYSAPHIRSTEIN = "xxxxxxxxxxx";


exports.handler = (event) => {
    console.log("Incoming Event: ", event);
    const bucket = event.Records[0].s3.bucket.name;
    const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
    const message = `File is uploaded in - ${bucket} -> ${filename}`;
    console.log(message);

    var params = {
        apiKey: APIKEYSAPHIRSTEIN,
        includeValue: true || false
    };
    var apiKey = APIGATEWAY.getApiKey(params).promise().then(function (apiKey) {
        console.log(apiKey);
        var httpsHeaders = {
            "Content-Type": "application/json",
            "x-api-key": apiKey.value
        };
        var httpsMethod = "POST";

        const splittedFilename = filename.split('.');
        const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1];
        const reportBucket = 'external.transactions.reports';
        var csvreadstream = s3.getObject({ Bucket: bucket, Key: filename }).createReadStream();
        const csvwritestream = csv.format({ headers: true });
        csvwritestream
            .pipe(utils.uploadFromStream(s3, reportBucket, reportFilename))
            .on("end", function () {
                console.log("Report written to S3 " + reportFilename);
            });
        var messagecounter = 0;
        csvreadstream
            .pipe(csv.parse({ headers: true }))
            .on('data', function (data) {
                csvreadstream.pause();
                console.log(JSON.stringify(data));
                if (data.mandateId != ''
                    && data.orderId != ''
                    && data.accountId != ''
                    && data.orderType != '') {
                    messagecounter += 1;
                    var message = {
                        "method": httpsMethod,
                        "headers": httpsHeaders,
                        "body": {
                            "type": "fileupload",
                            "mandateId": data.mandateId,
                            "orderId": data.orderId,
                            "accountId": data.accountId,

                            "openTS": Number.parseInt(data.openTS),
                            "closeTS": Number.parseInt(data.closeTS),

                            "orderType": data.orderType,

                            "orderValue": Number.parseFloat(data.orderValue),
                        }
                    };

                    message.method = "POST";

                    console.log("Message " + messagecounter + ": " + JSON.stringify(message));

                    lambda.invoke({
                        FunctionName: 'transactionProcessor',
                        InvocationType: "RequestResponse",
                        Payload: JSON.stringify(message)
                    }).promise().then(function(result){
                        var response = JSON.parse(result.Payload);
                        if (response.statusCode != 200) {
                            console.log("RESULT: " + JSON.stringify(data));
                            csvwritestream.write(data);
                        } 
                    })
                    .catch(function(error) {
							console.error(error);
					})
					.then(function() {
					    console.log("INVOKE request finished");
					})
                }
            })
            .on('error', function (error) {
                console.log("error");
            })
            .on('end', function () {
                console.log("end");
                csvwritestream.end();
            });
    });
};

lambda.invoke выполняется синхронно, однако при выполнении csvwritestream.write(data); появляется ошибка Error [ERR_STREAM_WRITE_AFTER_END]: write after end, которая означает, что csvwritestream.end() в .on('end') уже выполнено .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...