AWS Приклейте Dynami c Фрейм в JDB C операция обновления - PullRequest
0 голосов
/ 05 мая 2020

Мне нужно прочитать файл из S3, загрузить данные из db, сравнить данные и обновить таблицу в postgresql. Ниже приведен файл myinput

     tconst|averagerating|numvotes|
+---------+-------------+--------+
|tt0000001|          5.6|    1609|
|tt0000002|          6.0|     197|

Моя таблица имеет следующие столбцы

   tconst|averagerating|numvotes|last_changed_date | last_updated_user

Мое задание должно читать данные из формы S3, а также загружать данные из таблицы и сравнивать значение tconst как из файла S3, так и DB, если таблица tconst совпадает с файлом S3, обновите запись в db, иначе вставьте новую запись в db. Мне нужно выполнить приведенную ниже задачу. Не могли бы вы сообщить мне, как этого добиться.

Если значение tconst в файлах db и S3 совпадает, обновите запись? как мне выполнить операцию обновления с помощью клея

Ниже мой код, который я написал до сих пор

     glueContext = GlueContext(SparkContext.getOrCreate())
    spark = glueContext.spark_session

    db_name = "dataset"
    tbl_dataset = "dataset_data_txt"
    datasets = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_dataset)
    url = "jdbc:postgresql://database-2.cpfn3akbkuxb.us-east-1.rds.amazonaws.com:5432/cloud"
    db_properties = {
        "driver": "org.postgresql.Driver",
        "user": "postgres",
        "password": "jhhk"
    }
    dbdf = spark.read.jdbc(url=url,table='textdata',properties=db_properties)
    datasetsDF = datasets.toDF()
    lookupDF = dbdf.toDF()
    df1 = fundsDF.alias('df1')
df1.withColumn("last_chngd_date", current_timestamp())
df1.withColumn("last_chngd_user", "vinay")
    df2 = lookupDF.alias('df2')

    new_filtered=df1.join(df2, df1.tconst  != df2.tconst ).select('df1.*') // records that willl be inserted
    update_filtered=df1.join(df2, df1.tconst  == df2.tconst ).select('df1.*')// recods that will be updated
    newDynamicFarems = DynamicFrame.fromDF(new_filtered, glueContext, "test_nest")
    updateDynamicFarems = DynamicFrame.fromDF(update_filtered, glueContext, "test_nest")

    # Cycle through and write to Redshift.
    datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = newDynamicFarems, catalog_connection = "postgresqldb", connection_options = {"dbtable": "dataset", "database": "cloud"}, transformation_ctx = "datasink4")
    datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = updateDynamicFarems, catalog_connection = "postgresqldb", connection_options = {"dbtable": "dataset", "database": "cloud"}, transformation_ctx = "datasink5")

Я прошел по ссылке SPARK SQL - обновить MySql таблица с использованием DataFrames и JDB C, но у нее нет решения для pyspark с Glue

...