Передать параметр в конвейер данных AWS - встроенный шаблон из лямбда-функции - PullRequest
0 голосов
/ 25 сентября 2019

Я хотел бы создать конвейер данных, который будет задействован лямбда-функцией.Конвейер данных - это «Загрузка данных s3 в RDS MYSQL», сборка с использованием шаблона, предоставленного самим AWS.

Из моей лямбда-функции я не могу определить параметры для отправки в мой конвейер данных.Я хотел отправить следующие параметры в конвейер данных от лямбды,

"myRDSInstanceId": "source-dev",
"myRDSUsername": "username",
"myRDSTableInsertSql": "INSERT INTO employee(id,name,salary) VALUES(?,?,?,)",
"*myRDSPassword": "https://www.ec2instances.info/?filter=m3",
"myInputS3Loc": "s3://services/employee/",
"myRDSTableName": "employee"

Как это возможно ??Любая помощь Ниже приведен код Python для лямбда-выражения и определение моего трубопровода.

from __future__ import print_function
import json
import urllib
import boto3
def lambda_handler(event, context):

    client = boto3.client('datapipeline')
    print('Loading function here')
    client.activate_pipeline(
    pipelineId='df-095524176JKK0DOHDDDC',
    parameterValues=[{'id':'myRDSTableName','stringValue':'employee'}])

     return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

Определение конвейера

{
  "objects": [
    {
      "output": {
        "ref": "DestinationRDSTable"
      },
      "input": {
        "ref": "S3InputDataLocation"
      },
      "dependsOn": {
        "ref": "RdsMySqlTableCreateActivity"
      },
      "name": "DataLoadActivity",
      "id": "DataLoadActivity",
      "runsOn": {
        "ref": "Ec2Instance"
      },
      "type": "CopyActivity"
    },
    {
      "subnetId": "subnet-XXXXX",
      "instanceType": "m1.medium",
      "name": "Ec2Instance",
      "actionOnTaskFailure": "terminate",
      "securityGroups": "#{myEc2RdsSecurityGrps}",
      "id": "Ec2Instance",
      "type": "Ec2Resource",
      "terminateAfter": "1 Hours"
      "terminateAfter": "1 Hours"
    },
    {
      "database": {
        "ref": "rds_mysql"
      },
      "name": "RdsMySqlTableCreateActivity",
      "runsOn": {
        "ref": "Ec2Instance"
      },
      "id": "RdsMySqlTableCreateActivity",
      "type": "SqlActivity",
      "script": "#{myRDSCreateTableSql}"
    },
    {
      "*password": "password",
      "name": "rds_mysql",
      "id": "rds_mysql",
      "type": "RdsDatabase",
      "rdsInstanceId": "#{myRDSInstanceId}",
      "username": "#{myRDSUsername}"
    },
    {
      "name": "DataFormat1",
      "columnSeparator": "|",
      "id": "DataFormat1",
      "type": "TSV",
      "recordSeparator": "\\n"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "s3://logs/",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    },
    {
      "database": {
        "ref": "rds_mysql"
      },
      "name": "DestinationRDSTable",
      "insertQuery": "#{myRDSTableInsertSql}",
      "id": "DestinationRDSTable",
      "type": "SqlDataNode",
      "table": "#{myRDSTableName}",
      "selectQuery": "select * from #{table}"
    },
    {
      "directoryPath": "#{myInputS3Loc}",
      "dataFormat": {
        "ref": "DataFormat1"
      },
      "name": "S3InputDataLocation",
      "id": "S3InputDataLocation",
      "type": "S3DataNode"
    }
  ],
  "parameters": [
    {
      "description": "RDS MySQL password",
      "id": "*myRDSPassword",
      "type": "String"
    },
    {
      "watermark": "security group name",
      "helpText": "The names of one or more EC2 security groups that have access to the RDS MySQL cluster.",
      "description": "RDS MySQL security group(s)",
      "isArray": "true",
      "optional": "true",
      "id": "myEc2RdsSecurityGrps",
      "type": "String"
    },
    {
      "description": "RDS MySQL username",
      "id": "myRDSUsername",
      "type": "String"
    },
    {
      "description": "Input S3 file path",
      "id": "myInputS3Loc",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "helpText": "The SQL statement to insert data into the RDS MySQL table.",
      "watermark": "INSERT INTO #{table} (col1, col2, col3) VALUES(?, ?, ?) ;",
      "description": "Insert SQL query",
      "id": "myRDSTableInsertSql",
      "type": "String"
    },
    {
      "helpText": "The name of an existing table or a new table that will be created based on the create table SQL query parameter below.",
      "description": "RDS MySQL table name",
      "id": "myRDSTableName",
      "type": "String"
    },
    {
      "watermark": "DB Instance",
      "description": "RDS Instance ID",
      "id": "myRDSInstanceId",
      "type": "String"
    }
  ],
  "values": {
    "myRDSInstanceId": "source-dev",
    "myRDSUsername": "username",
    "myRDSTableInsertSql": "INSERT INTO employee(id,name,salary) VALUES(?,?,?,)",
    "*myRDSPassword": "https://www.ec2instances.info/?filter=m3",
    "myInputS3Loc": "s3://services/employee/",
    "myRDSTableName": "employee"
  }
}

1 Ответ

0 голосов
/ 25 сентября 2019

Параметры для activate_pipeline задаются входом в список, поэтому

client.activate_pipeline(
    pipelineId='df-095524176JKK0DOHDDDC',
    parameterValues=[
        {
            'id':'myRDSTableName',
            'stringValue':'employee'
        },
        {
            'id':'blah',
            'stringValue':'blah'
        },
        ...
    ]
)

повторно добавляют значения параметров.Вы можете проверить больше деталей из документации boto3 .

...