Как управлять соединением Postgres в параллельной лямбда-функции AWS? - PullRequest
0 голосов
/ 02 апреля 2019

Кто-нибудь, у кого есть опыт создания параллельной лямбда-функции AWS с Postgres?

Мне нужно создать лямбда-хрон, который будет загружать тысячи счетов в базу данных Postgres.Я должен вызывать лямбда-функцию приема одновременно для каждого счета.Проблема состоит в том, что, поскольку это происходит одновременно, каждый экземпляр функции приема создает соединение с базой данных.Это означает, что если у меня есть счет-фактура 1000 для загрузки, каждый счет будет вызывать лямбда-функцию, которая создаст соединение с базой данных 1000.Это исчерпает максимальное соединение, которое может выдержать Postgres.Некоторый экземпляр вызванной лямбда-функции вернет ошибку, сообщающую, что больше нет доступных соединений.

Какие советы вы можете дать, как решить эту проблему?

Вот некоторые фрагменты моего кода:

ingestInvoiceList.js

var AWS = require('aws-sdk');
var sftp = require('ssh2-sftp-client');

var lambda = AWS.Lambda();

exports.handler = async (evenrt) => {
   ...

        let folder_contents;
        try {
            // fetch list of Zip format invoices
            folder_contents = await sftp.list(client_folder);
        } catch (err) {
            console.log(`[${client}]: ${err.toString()}`);
            throw new Error(`[${client}]: ${err.toString()}`);
        }

        let invoiceCount = 0;

        let funcName = 'ingestInvoice';


        for (let item of folder_contents) {
            if (item.type === '-') {
                let payload = JSON.stringify({
                    invoice: item.name
                });
                let params = {
                    FunctionName: funcName,
                    Payload: payload,
                   InvocationType: 'Event'
                };


                //invo9ke ingest invoice concurrently
                let result = await new Promise((resolve) => {
                    lambda.invoke(params, (err, data) => {
                        if (err) resolve(err);
                        else resolve(data);
                    });
                });

                console.log('result: ', result);

                invoiceCount++;
            }
        }
   ...
}

ingestInvoice.js

var AWS = require('aws-sdk');
var sftp = require('ssh2-sftp-client');
var DBClient = require('db.js')l

var lambda = AWS.Lambda();

exports.handler = async (evenrt) => {
   ...

   let invoice = event.invoice;
   let client = 'client name';

   let db = new DBClient();

   try {
        console.log(`[${client}]: Extracting documents from ${invoice}`);

        try {
            // get zip file from sftp server
            await sftp.fastGet(invoice, '/tmp/tmp.zip', {});
        } catch (err) {
            throw err;
        }


        let zip;
        try {
            // extract the zip file...
            zip = await new Promise((resolve, reject) => {
                fs.readFile("/tmp/tmp.zip", async function (err, data) {
                    if (err) return reject(err);

                    let unzippedData;
                    try {
                        unzippedData = await JSZip.loadAsync(data);
                    } catch (err) {
                        return reject(err);
                    }

                    return resolve(unzippedData);
                });
            });

        } catch (err) {
            throw err;
        }

        let unibillRegEx = /unibill.+\.txt/g;

        let files = [];
        zip.forEach(async (path, entry) => {
            if (unibillRegEx.exec(entry.name)) {
                files['unibillObj'] = entry;
            } else {
                files['pdfObj'] = entry;
            }
        });


        // await db.getClient().connect();
        await db.setSchema(client);
        console.log('Schema has been set.');

        let unibillStr = await files.unibillObj.async('string');

        console.log('ingesting ', files.unibillObj.name);

        //Do ingestion queries here...
        ...

        await uploadInvoiceDocsToS3(client, files);

    } catch (err) {
        console.error(err.stack);
        throw err;
    } finally {
        try {
            // console.log('Disconnecting from database...');
            // await db.endClient();
            console.log('Disconnecting from SFTP...');
            await sftp.end();
        } catch (err) {
            console.log('ERROR: ' + err.toString());
            throw err;
        }
    }
   ...
}

db.js

var { Pool } = require('pg');

module.exports = class DBClient {
    constructor() {
    this.pool = new Pool();
   }

   async setSchema(schema) {
      await this.execQuery(`SET search_path TO ${schema}`);
   }

   async execQuery(sql) {
      return await this.pool.query(sql);
   }
}

Любой ответ будет оценен, спасибо!

1 Ответ

0 голосов
/ 02 апреля 2019

Я вижу два способа справиться с этим. В конечном итоге это зависит от того, насколько быстро вы хотите обработать эти данные.

  1. Измените параметр параллелизма для вашей лямбды на «Резервный параллелизм»: Reserve Concurrency.

Это позволит вам ограничить число одновременно работающих Lambda (см. эту ссылку для получения более подробной информации).

  1. Измените свой код, чтобы поставить в очередь работу, которая должна быть выполнена в очереди SQS. Оттуда вам нужно будет создать другую лямбду, которая будет запускаться очередью, и обрабатывать ее по мере необходимости. Эта лямбда может решить, сколько вытащить из очереди за раз, и это также, вероятно, должно быть ограничено параллелизмом. Но вы можете настроить его, например, для запуска в течение максимум 15 минут, что может быть достаточно для очистки очереди и не приведет к уничтожению БД. Или, если бы у вас был, скажем, максимальный параллелизм, равный 100, вы бы работали быстро, не убивая БД.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...