В настоящее время Glue не поддерживает создание закладок для источников JDBC.
Вы можете внедрить upsert / merge в Redshift в задании Glue, используя параметр postactions
(код в Scala):
val fields = sourceDf.columns.mkString(",")
glueContext.getJDBCSink(
catalogConnection = "RedshiftConnectionTest",
options = JsonOptions(Map(
"database" -> "conndb",
"dbtable" -> "staging_schema.staging_table",
"postactions" ->
s"""
DELETE FROM dst_schema.dst_table USING staging_schema.staging_table AS S WHERE dst_table.id = S.id;
INSERT INTO dst_schema.dst_table ($fields) SELECT $fields FROM staging_schema.staging_table;
DROP TABLE IF EXISTS staging_schema.staging_table
"""
)),
redshiftTmpDir = tempDir,
transformationContext = "redshift-output"
).writeDynamicFrame(DynamicFrame(sourceDf, glueContext))
Если вы просто хотите удалить существующую таблицу, вы можете использовать вместо нее параметр preactions
:
glueContext.getJDBCSink(
catalogConnection = "RedshiftConnectionTest",
options = JsonOptions(Map(
"database" -> "conndb",
"dbtable" -> "dst_schema.dst_table",
"preactions" -> "DELETE FROM dst_schema.dst_table"
)),
redshiftTmpDir = tempDir,
transformationContext = "redshift-output"
).writeDynamicFrame(DynamicFrame(sourceDf, glueContext))