Передача файлов в S3-контейнеры на основе статуса задания Glue - PullRequest
0 голосов
/ 24 марта 2020
 I am new to **AWS Glue,** and my aim is to extract  transform and load files uploaded in S3 bucket to RDS instance. Also I need to transfer the files into separate S3 buckets based on the Glue Job status (Success /Failure). There will be more than one file uploaded into the initial S3 bucket. How can I get the name of the files uploaded so that i can transfer those files to appropriate buckets.

Шаг 1: Загрузить файлы в S3 bucket1. Шаг 2: Запустить функцию lamda для вызова Job1. Шаг 3: При успешном завершении передачи файла Job1 в корзину S3. Шаг 4: При неудачной передаче в другую корзину S3

.

1 Ответ

0 голосов
/ 24 марта 2020

Инициируйте лямбда-событие, прослушивающее папку, в которую вы загружаете файлы на S3. В лямбда-выражении используйте AWS Glue API для запуска задания склеивания (по сути, скрипт python в AWS Glue).

В скрипте Glue python используйте соответствующую библиотеку, такую ​​как py mysql, et c. как внешняя библиотека, упакованная с вашим скриптом python.

Выполните операции загрузки данных из S3 в таблицы RDS. Если вы используете Aurora Mysql, тогда AWS предоставил замечательную функцию «загрузка из S3», поэтому вы можете напрямую загрузить файл в таблицы (вам может потребоваться выполнить некоторые настройки в ролях PARAMETER GROUP / IAM) .

Лямбда-скрипт для вызова склеивания:

s3 = boto3.client('s3')
glue = boto3.client('glue')

def lambda_handler(event, context):
    gluejobname="<YOUR GLUE JOB NAME>"

    try:
        runId = glue.start_job_run(JobName=gluejobname)
        status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
        print("Job Status : ", status['JobRun']['JobRunState'])
    except Exception as e:
        print(e)
    raise e

Скрипт склейки:

import mysql.connector
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.context import DynamicFrame
from awsglue.transforms import *
from pyspark.sql.types import StringType
from pyspark.sql.types import DateType
from pyspark.sql.functions import unix_timestamp, from_unixtime
from pyspark.sql import SQLContext

# Create a Glue context
glueContext = GlueContext(SparkContext.getOrCreate())

url="<RDS URL>"
uname="<USER NAME>"
pwd="<PASSWORD>"
dbase="DBNAME"


def connect():
    conn = mysql.connector.connect(host=url, user=uname, password=pwd, database=dbase)
    cur = conn.cursor()
    return cur, conn

def create_stg_table():
    cur, conn = connect()
    createStgTable1 = <CREATE STAGING TABLE IF REQUIRED>

    loadQry = "LOAD DATA FROM S3 PREFIX 'S3://PATH FOR YOUR CSV' REPLACE INTO TABLE <DB.TABLENAME> FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' IGNORE 1 LINES (@var1, @var2, @var3, @var4, @var5, @var6, @var7, @var8) SET ......;"
    cur.execute(createStgTable1)
    cur.execute(loadQry)
    conn.commit()
    conn.close()

Затем вы можете создать предупреждение об облачной проверке, в котором проверяется состояние склеивания, и, в зависимости от состояния, выполнить операции копирования файла между S3. У нас есть аналогичные установки в нашем производстве.

С уважением

Юва

...