Мне нужно прочитать файл из S3, загрузить данные из db, сравнить данные и обновить таблицу в postgresql. Ниже приведен файл myinput
tconst|averagerating|numvotes|
+---------+-------------+--------+
|tt0000001| 5.6| 1609|
|tt0000002| 6.0| 197|
Моя таблица имеет следующие столбцы
tconst|averagerating|numvotes|last_changed_date | last_updated_user
Мое задание должно читать данные из формы S3, а также загружать данные из таблицы и сравнивать значение tconst как из файла S3, так и DB, если таблица tconst совпадает с файлом S3, обновите запись в db, иначе вставьте новую запись в db. Мне нужно выполнить приведенную ниже задачу. Не могли бы вы сообщить мне, как этого добиться.
Если значение tconst в файлах db и S3 совпадает, обновите запись? как мне выполнить операцию обновления с помощью клея
Ниже мой код, который я написал до сих пор
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
db_name = "dataset"
tbl_dataset = "dataset_data_txt"
datasets = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_dataset)
url = "jdbc:postgresql://database-2.cpfn3akbkuxb.us-east-1.rds.amazonaws.com:5432/cloud"
db_properties = {
"driver": "org.postgresql.Driver",
"user": "postgres",
"password": "jhhk"
}
dbdf = spark.read.jdbc(url=url,table='textdata',properties=db_properties)
datasetsDF = datasets.toDF()
lookupDF = dbdf.toDF()
df1 = fundsDF.alias('df1')
df1.withColumn("last_chngd_date", current_timestamp())
df1.withColumn("last_chngd_user", "vinay")
df2 = lookupDF.alias('df2')
new_filtered=df1.join(df2, df1.tconst != df2.tconst ).select('df1.*') // records that willl be inserted
update_filtered=df1.join(df2, df1.tconst == df2.tconst ).select('df1.*')// recods that will be updated
newDynamicFarems = DynamicFrame.fromDF(new_filtered, glueContext, "test_nest")
updateDynamicFarems = DynamicFrame.fromDF(update_filtered, glueContext, "test_nest")
# Cycle through and write to Redshift.
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = newDynamicFarems, catalog_connection = "postgresqldb", connection_options = {"dbtable": "dataset", "database": "cloud"}, transformation_ctx = "datasink4")
datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = updateDynamicFarems, catalog_connection = "postgresqldb", connection_options = {"dbtable": "dataset", "database": "cloud"}, transformation_ctx = "datasink5")
Я прошел по ссылке SPARK SQL - обновить MySql таблица с использованием DataFrames и JDB C, но у нее нет решения для pyspark с Glue