На большом столе Prem перейдите на клей Redshift AWS - PullRequest
0 голосов
/ 20 сентября 2018

У меня есть сценарий ниже, чтобы переместить все столбцы в таблицах разных размеров, глубиной от 90 миллионов до 250 миллионов записей, из локальной базы данных Oracle в AWS Redshift.Сценарий также добавляет несколько заданных столбцов аудита:

add_metadata1 = custom_spark_df.withColumn('line_number', F.row_number().over(Window.orderBy(lit(1))))
add_metadata2 = add_metadata1.withColumn('source_system', lit(source_system))
add_metadata3 = add_metadata2.withColumn('input_filename', lit(input_filename))
add_metadata4 = add_metadata3.withColumn('received_timestamp', lit(received_timestamp))
add_metadata5 = add_metadata4.withColumn('received_timestamp_unix', lit(received_timestamp_unix))
add_metadata6 = add_metadata5.withColumn('eff_data_date', lit(eff_data_date))

В настоящее время длительный характер задания вызывает тайм-аут соединения через 3-5 часов и поэтому никогда не завершается:

  import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## Start - Custom block of imports ##
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import datetime 
from pyspark.sql.functions import lit
## End - Custom block of imports ##

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "metadatastore", table_name = "TableName", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("...MAPPINGS OUTLINED...")], transformation_ctx = "applymapping1")

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

## Start - Custom block for creation of metadata columns ##
now = datetime.datetime.now()

##line_number = '1'
## Remember to update source_system (if needed) and input_filename
source_system = 'EDW'
input_filename = 'TableName' 
received_timestamp = datetime.datetime.strptime(now.strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")

received_timestamp_unix = int((now - datetime.datetime(1970,1,1)).total_seconds())

eff_data_date = datetime.datetime.strptime(now.strftime("%Y-%m-%d"), "%Y-%m-%d").date()

## Update to the last dataframe used
## Do not forget to update write_dynamic_frame to use custom_dynamic_frame for the frame name and add schema to the dbtable name
custom_spark_df = dropnullfields3.toDF()

add_metadata1 = custom_spark_df.withColumn('line_number', F.row_number().over(Window.orderBy(lit(1))))
add_metadata2 = add_metadata1.withColumn('source_system', lit(source_system))
add_metadata3 = add_metadata2.withColumn('input_filename', lit(input_filename))
add_metadata4 = add_metadata3.withColumn('received_timestamp', lit(received_timestamp))
add_metadata5 = add_metadata4.withColumn('received_timestamp_unix', lit(received_timestamp_unix))
add_metadata6 = add_metadata5.withColumn('eff_data_date', lit(eff_data_date))

custom_dynamic_frame = DynamicFrame.fromDF(add_metadata6, glueContext, "add_metadata6")
## End - Custom block for creation of metadata columns ##

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = custom_dynamic_frame, catalog_connection = "Redshift", connection_options = {"dbtable": "schema_name.TableName", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()

Как улучшить этот сценарий, чтобы сократить время выполнения и разрешить полное выполнение?

1 Ответ

0 голосов
/ 21 сентября 2018

Согласен с Сауроном.Я думаю, что вам лучше создать дамп CSV, скопируйте его, поместите в s3.Вы также можете преобразовать его в формат паркета с помощью клея, как только файл окажется в S3.Для одного сброса этот подход будет быстрее.

Чтобы задать вопрос о коде AWS Glue для загрузки в S3 из исходного кода, вам просто нужно изменить вторую последнюю строку кода, которая выполняет написание.Используйте что-то вроде ниже:

datasink4 = glueContext.write_dynamic_frame.from_options(frame = custom_dynamic_frame, connection_type = "s3", connection_options = {"path": s3_output}, format = "parquet", transformation_ctx = "datasink4")
...