Считывание и копирование данных инвентаризации S3 из триггера тем SNS с помощью лямбда-функции AWS - PullRequest
0 голосов
/ 30 января 2019

Я аналитик данных и новичок в лямбда-функциях AWS.У меня есть корзина s3, в которой я храню данные инвентаризации из нашего озера данных, которые создаются с помощью функции инвентаризации на вкладке S3 Management.

Допустим, данные инвентаризации (отчеты) выглядят так:

s3://my-bucket/allobjects/data/report-1.csv.gz
s3://my-bucket/allobjects/data/report-2.csv.gz
s3://my-bucket/allobjects/data/report-3.csv.gz

Независимо от содержимого файла, у меня есть настройка события для s3: // my-bucket / allobjects /данные / которые уведомляют тему SNS во время любого события, такого как GET или PUT.(Я не могу изменить этот рабочий процесс из-за строгого управления)

Теперь я пытаюсь создать лямбда-функцию с этим разделом SNS в качестве триггера и просто переместить файлы отчета об инвентаризации, созданные функцией инвентаризации S3, в

s3://my-bucket/allobjects/data/ 

и перераспределить его следующим образом:

s3://my-object/allobjects/partitiondata/year=2019/month=01/day=29/report-1.csv.gz
s3://my-object/allobjects/partitiondata/year=2019/month=01/day=29/report-2.csv.gz
s3://my-object/allobjects/partitiondata/year=2019/month=01/day=29/report-3.csv.gz

Как мне добиться этого с помощью лямбда-функции (в порядке node.js или python), читающей тему SNS?Любая помощь приветствуется.

Я пробовал что-то подобное, основываясь на некотором smaple-коде, который я нашел в Интернете, но это не помогло.

console.log('Loading function');

var AWS = require('aws-sdk');  
AWS.config.region = 'us-east-1';

exports.handler = function(event, context) {  
console.log("\n\nLoading handler\n\n");
var sns = new AWS.SNS();

sns.publish({
    Message: 'File(s) uploaded successfully',
    TopicArn: 'arn:aws:sns:_my_ARN'
}, function(err, data) {
    if (err) {
        console.log(err.stack);
        return;
    }
    console.log('push sent');
    console.log(data);
    context.done(null, 'Function Finished!');  
});
};

1 Ответ

0 голосов
/ 30 января 2019

Предпочтительным способом для события Amazon S3 является непосредственное включение функции AWS Lambda.Но поскольку вы не можете изменить этот порт, поток будет выглядеть следующим образом:

  • Событие Amazon S3 отправит сообщение в тему Amazon SNS.
  • Функция AWS Lambda подписана наТема SNS, поэтому он запускается и получает сообщение от S3.
  • Функция Lambda извлекает Bucket и Key, а затем вызывает S3 на copy_object() в другое место.(Нет команды move . Вам нужно будет скопировать объект в новое ведро / ключ.)

Содержимое eventполе выглядит примерно так:

{
    "Records": [
        {
            "EventSource": "aws:sns",
            "EventVersion": "1.0",
            "EventSubscriptionArn": "...",
            "Sns": {
                "Type": "Notification",
                "MessageId": "1c3189f0-ffd3-53fb-b60b-dd3beeecf151",
                "TopicArn": "...",
                "Subject": "Amazon S3 Notification",
                "Message": "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"ap-southeast-2\",\"eventTime\":\"2019-01-30T02:42:07.129Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:AIDAIZCFQCOMZZZDASS6Q\"},\"requestParameters\":{\"sourceIPAddress\":\"54.1.1.1\"},\"responseElements\":{\"x-amz-request-id\":\"...",\"x-amz-id-2\":\"..."},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"...\",\"bucket\":{\"name\":\"stack-lake\",\"ownerIdentity\":{\"principalId\":\"...\"},\"arn\":\"arn:aws:s3:::stack-lake\"},\"object\":{\"key\":\"index.html\",\"size\":4378,\"eTag\":\"...\",\"sequencer\":\"...\"}}}]}",
                "Timestamp": "2019-01-30T02:42:07.212Z",
                "SignatureVersion": "1",
                "Signature": "...",
                "SigningCertUrl": "...",
                "UnsubscribeUrl": "...",
                "MessageAttributes": {}
            }
        }
    ]
}

Таким образом, имя загруженного объекта необходимо извлечь из Message.

Вы можете использовать код, подобный следующему:

import json

def lambda_handler(event, context):

    for record1 in event['Records']:
        message = json.loads(record1['Sns']['Message'])

        for record2 in message['Records']:

            bucket = record2['s3']['bucket']['name'])
            key = record2['s3']['object']['key'])

            # Do something here with bucket and key

    return {
        'statusCode': 200,
        'body': json.dumps(event)
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...