Как использовать stream.pipeline в aws nodejs lambda - PullRequest
0 голосов
/ 23 апреля 2020

Я пытаюсь передать данные из курсора mongodb в файл s3, используя nodejs лямбду.

Ниже приведен фрагмент моего кода.

Что я заметил, так это лямбда не ждет завершения конвейера и существует, поэтому файл не записывается в s3.

Но то же самое прекрасно работает, если я запускаю его как отдельный node.js скрипт.

const logger = require('./logger').logger;
let s3Client = require('aws-sdk/clients/s3');
const stream = require('stream');
const util = require('util');
const pipeline = util.promisify(stream.pipeline);

exports.handler =  async (event, context) => {


    await pipeline(
        client.db("somedb").collection("somecollection").aggregate(crtiriaObj).stream({transform: x => `${JSON.stringify(x)}\n`}),
        uploadFromStream()
    )

};

let uploadFromStream =  () => {

    let pass = new stream.PassThrough();
    let s3 = new s3Client();;


    let params = {Bucket: "bucketname", Key: "filename", Body: pass};

    s3.upload(params, function(err, data) {
        if (err) {
            logger.error(`Error uploading file ${fileName}`,err);
        } else {
            logger.info(`Successfully uploaded file: ${fileName}, result: ${JSON.stringify(data)}`);
        }

    });

    return pass;
};

1 Ответ

0 голосов
/ 05 мая 2020

Я закончил тем, что делал это без asyn c / await fashion.

Мой код в итоге выглядел как фрагмент кода ниже. Я также написал об этом в блоге: https://dev.to/anandsunderraman/copying-over-data-from-mongodb-to-s3-3j4g

const MongoClient = require('mongodb').MongoClient;
let s3Client = require('aws-sdk/clients/s3');
const stream = require('stream');
const pipeline = stream.pipeline;


//brute force method loading all the data into an array
exports.copyData = (event, context, callback) => {

    MongoClient.connect(getDBURI(), {
            useNewUrlParser: true,
            useUnifiedTopology: true
    }).then((dbConnection) => {

        pipeline(
            dbConnection.db("<db-name>").collection("<collection-name>").aggregate(<aggregate-criteria>)
                                        .stream({transform: x => convertToNDJSON(x)}),
            uploadDataToS3(callback),
            (err) => {
                if (err) {
                    console.log('Pipeline failed.', err);
                } else {
                    console.log('Pipeline succeeded.');
                }
            }
        )

    })


}
/**
 * Construct the DB URI based on the environment
 * @returns {string}
 */
const getDBURI = () => {
    //best practice is to fetch the password from AWS Parameter store
    return "mongodb://<username>:<password>@<hostname>/<your-db-name>";
};

//converts each db record to ndjson => newline delimited json
let convertToNDJSON = (data) => {
    return JSON.stringify(data) + "\n";
};

let uploadDataToS3 =  (callback) => {
    let env = process.env;
    let s3 = null;
    let pass = new stream.PassThrough();
    if (env === 'local') {
        s3  = new s3Client({
            accessKeyId: 'minioadmin' ,
            secretAccessKey: 'minioadmin' ,
            endpoint: 'http://host.docker.internal:9000' ,
            s3ForcePathStyle: true, // needed with minio?
            signatureVersion: 'v4'
        });
    } else {
        s3 = new s3Client();
    }
    //using multipart upload to speed up the process
    let params = {Bucket: '<your-bucket-name>', Key: '<file-name>', Body: data};
    let opts = {queueSize: 2, partSize: 1024 * 1024 * 10};

    s3.upload(params,opts, function(err, data) {
        if (err) {
            console.log(`Error uploading file ${file-name}`,err);
        } else {
            console.log(`Successfully uploaded file: ${file-name}, result: ${JSON.stringify(data)}`);
        }
        callback();

    });
    return pass;

};
...