Инициируйте лямбда-событие, прослушивающее папку, в которую вы загружаете файлы на S3. В лямбда-выражении используйте AWS Glue API для запуска задания склеивания (по сути, скрипт python в AWS Glue).
В скрипте Glue python используйте соответствующую библиотеку, такую как py mysql, et c. как внешняя библиотека, упакованная с вашим скриптом python.
Выполните операции загрузки данных из S3 в таблицы RDS. Если вы используете Aurora Mysql, тогда AWS предоставил замечательную функцию «загрузка из S3», поэтому вы можете напрямую загрузить файл в таблицы (вам может потребоваться выполнить некоторые настройки в ролях PARAMETER GROUP / IAM) .
Лямбда-скрипт для вызова склеивания:
s3 = boto3.client('s3')
glue = boto3.client('glue')
def lambda_handler(event, context):
gluejobname="<YOUR GLUE JOB NAME>"
try:
runId = glue.start_job_run(JobName=gluejobname)
status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
print("Job Status : ", status['JobRun']['JobRunState'])
except Exception as e:
print(e)
raise e
Скрипт склейки:
import mysql.connector
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.context import DynamicFrame
from awsglue.transforms import *
from pyspark.sql.types import StringType
from pyspark.sql.types import DateType
from pyspark.sql.functions import unix_timestamp, from_unixtime
from pyspark.sql import SQLContext
# Create a Glue context
glueContext = GlueContext(SparkContext.getOrCreate())
url="<RDS URL>"
uname="<USER NAME>"
pwd="<PASSWORD>"
dbase="DBNAME"
def connect():
conn = mysql.connector.connect(host=url, user=uname, password=pwd, database=dbase)
cur = conn.cursor()
return cur, conn
def create_stg_table():
cur, conn = connect()
createStgTable1 = <CREATE STAGING TABLE IF REQUIRED>
loadQry = "LOAD DATA FROM S3 PREFIX 'S3://PATH FOR YOUR CSV' REPLACE INTO TABLE <DB.TABLENAME> FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' IGNORE 1 LINES (@var1, @var2, @var3, @var4, @var5, @var6, @var7, @var8) SET ......;"
cur.execute(createStgTable1)
cur.execute(loadQry)
conn.commit()
conn.close()
Затем вы можете создать предупреждение об облачной проверке, в котором проверяется состояние склеивания, и, в зависимости от состояния, выполнить операции копирования файла между S3. У нас есть аналогичные установки в нашем производстве.
С уважением
Юва