Лямбда - Импорт CSV из S3 в RDS MySQL - PullRequest
7 голосов
/ 24 января 2020

У меня есть лямбда-функция, которая импортирует указанный c CSV-файл из S3 в MySQL. Тем не менее, размер файла CSV составляет около 1 ГБ. Когда я запускаю этот код, он не обрабатывается и время ожидания.

//s3 to rds
const fs = require("fs");
const AWS = require('aws-sdk');
var mysql = require('mysql');
var config = require('./config.json');
const s3 = new AWS.S3({
  accessKeyId: 'XXXXXXXXXXXXXXX',
  secretAccessKey: 'XXXXXXXXXXXXXXXXXXXXXXXXXXxx'
});
var filePath = `localfilepath`;

var pool = mysql.createPool({
  host: config.dbhost,
  user: config.dbuser,
  password: config.dbpassword,
  database: config.dbname
});
pool.getConnection((err, connection) => {
  if (err) throw err;
  console.log("Connected!" + connection);

  var s3Params = {
    Bucket: '<your_bucket_name>',
    Key: '<your_key>'
  };
  s3.getObject(s3Params, function(err, result) {
    if (err) {
      throw new Error(err);
    } else {
      console.log('file stored successfully', result);
      fs.createWriteStream(filePath).write(result.Body);
      connection.query('TRUNCATE TABLE <table_name>', (err, result) => {
        if (err) {
         throw new Error(err);
        } else {
          console.log('table truncated');
          var query = `LOAD DATA LOCAL INFILE '<file_name>' INTO table <table_name> FIELDS TERMINATED BY ','  ENCLOSED BY '"' IGNORE 1 LINES `;
          connection.query(query, function(err, result) {
            if (err) throw err;
            console.log("Result: " + result);
            connection.release();
            fs.unlinkSync(filePath);
            console.log('file deleted');
          });
        }
      });
    }

  });
})

Как я могу заставить это работать?

Ответы [ 3 ]

3 голосов
/ 27 января 2020

Согласно этому потоку , они ожидают реализации в какой-то момент, однако, когда это лучший сценарий предположения.

AWS В настоящее время Lambda имеет "жесткий предел" в 512 МБ дисковое пространство в каталоге / tmp (как указано здесь ), поэтому строка fs.createWriteStream(filePath).write(result.Body); здесь не должна работать, поскольку размер файла составляет 1 ГБ. Ошибка будет что-то вроде "no space left on device" (из обзора существующих тем).

Однако загрузка файла из S3 должна работать в этом случае. Лямбда пропорционально масштабируется как по размеру памяти, так и по размеру процессора, поэтому возможен тайм-аут из-за недостатка памяти (в зависимости от того, что вы установили). Эта ссылка дает хороший индикатор того, что вам нужно для этого установить (в зависимости от того, какое содержимое вы загружаете в память по сравнению с дисковым пространством).

Я бы предложил разделить поток на блоки 512 МБ ( этот пакет может помочь) на этом этапе и их сохранение в S3 отдельно, таким образом, вы можете разделить эту операцию на 2 функции:

  1. Выборка данных и разделение на отдельные файлы s3 (также обрезка таблицы).
  2. Загрузка данных CSV обратно в RDS из S3

(Вы можете использовать События Cloudwatch для это)

2 голосов
/ 02 февраля 2020

В основном вам нужно преодолеть 2 препятствия: 1) локальное хранилище на Lambda составляет всего 512 МБ и 2) Lambda имеет ограничение времени выполнения 15 минут (которое вы должны явно настроить в своей функции)

Для решить проблему 1, вы можете использовать S3 Select . Он позволяет выполнять SQL запросов к объектам (файлам CSV и JSON файлам) в S3. Выполните запрос выбора S3 для вашего CSV-файла, и для каждой получаемой вами записи вы можете вставить ее в очередь, а другие работники вставят ее в базу данных. Вы также можете вставить напрямую в свой RDS, но это может быть медленнее.

Вот пример кода:

const AWS = require('aws-sdk');
var fs = require('fs');

const S3 = new AWS.S3();

exports.handler = async (event, context) => {
    try {
        const query = "SELECT * FROM s3object s WHERE s.id > '0'";
        const bucket = 'my-bucket';
        const key = 'data.csv';

        const params = {
            Bucket: bucket,
            Key: key,
            ExpressionType: 'SQL',
            Expression: query,
            InputSerialization: { CSV: { FileHeaderInfo: 'USE' } },
            OutputSerialization: { CSV: {} }
        }

        const data = await getDataUsingS3Select(params);
        context.succeed(data);
    } catch (error) {
        context.fail(error);
    }
};

const getDataUsingS3Select = async (params) => {
    return new Promise((resolve, reject) => {
        S3.selectObjectContent(params, (err, data) => {
            if (err) { reject(err); }

            // This is a stream of events
            data.Payload.on('data', (event) => {
                // event, there is data inside it
                if (event.Records) {
                    // do what you want with payload: send to a queue or direct to db
                    console.log('Row:', event.Records.Payload.toString('utf8'));
                }
            }).on('end', () => {
                // we arrive here after processing everything
                resolve();
            });
        });
    })
}

Если вы продолжаете превышать 15-минутный лимит, это проблема 2. Сначала добавьте предложение limit в SQL. Затем вы можете создать файл "контрольной точки" в каталоге /tmp Lambda. Вы можете сохранить id последней обработанной вами записи, чтобы при повторном запуске функции Lambda он мог прочитать этот файл, выбрать id и использовать его в предложении where вашего запроса, например:

select * from s3object s where s.id > '99' limit 50000
0 голосов
/ 29 января 2020

Если ваша основная цель - импортировать данные из файла CSV на S3 в RDS MySQL, установите флажок AWS Конвейер данных . Он уже имеет все определенные ресурсы, необходимые для выполнения этой общей задачи в Загрузка данных S3 в Amazon RDS MySQL Таблица , однако он использует экземпляр EC2. Но в то же время проще масштабировать и поддерживать решение.

...