У меня есть сценарий ниже, чтобы переместить все столбцы в таблицах разных размеров, глубиной от 90 миллионов до 250 миллионов записей, из локальной базы данных Oracle в AWS Redshift.Сценарий также добавляет несколько заданных столбцов аудита:
add_metadata1 = custom_spark_df.withColumn('line_number', F.row_number().over(Window.orderBy(lit(1))))
add_metadata2 = add_metadata1.withColumn('source_system', lit(source_system))
add_metadata3 = add_metadata2.withColumn('input_filename', lit(input_filename))
add_metadata4 = add_metadata3.withColumn('received_timestamp', lit(received_timestamp))
add_metadata5 = add_metadata4.withColumn('received_timestamp_unix', lit(received_timestamp_unix))
add_metadata6 = add_metadata5.withColumn('eff_data_date', lit(eff_data_date))
В настоящее время длительный характер задания вызывает тайм-аут соединения через 3-5 часов и поэтому никогда не завершается:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## Start - Custom block of imports ##
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import datetime
from pyspark.sql.functions import lit
## End - Custom block of imports ##
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "metadatastore", table_name = "TableName", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("...MAPPINGS OUTLINED...")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## Start - Custom block for creation of metadata columns ##
now = datetime.datetime.now()
##line_number = '1'
## Remember to update source_system (if needed) and input_filename
source_system = 'EDW'
input_filename = 'TableName'
received_timestamp = datetime.datetime.strptime(now.strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
received_timestamp_unix = int((now - datetime.datetime(1970,1,1)).total_seconds())
eff_data_date = datetime.datetime.strptime(now.strftime("%Y-%m-%d"), "%Y-%m-%d").date()
## Update to the last dataframe used
## Do not forget to update write_dynamic_frame to use custom_dynamic_frame for the frame name and add schema to the dbtable name
custom_spark_df = dropnullfields3.toDF()
add_metadata1 = custom_spark_df.withColumn('line_number', F.row_number().over(Window.orderBy(lit(1))))
add_metadata2 = add_metadata1.withColumn('source_system', lit(source_system))
add_metadata3 = add_metadata2.withColumn('input_filename', lit(input_filename))
add_metadata4 = add_metadata3.withColumn('received_timestamp', lit(received_timestamp))
add_metadata5 = add_metadata4.withColumn('received_timestamp_unix', lit(received_timestamp_unix))
add_metadata6 = add_metadata5.withColumn('eff_data_date', lit(eff_data_date))
custom_dynamic_frame = DynamicFrame.fromDF(add_metadata6, glueContext, "add_metadata6")
## End - Custom block for creation of metadata columns ##
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = custom_dynamic_frame, catalog_connection = "Redshift", connection_options = {"dbtable": "schema_name.TableName", "database": "dev"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()
Как улучшить этот сценарий, чтобы сократить время выполнения и разрешить полное выполнение?