Использование PySpark DataFrameReader Я пытаюсь читать из таблицы RDS и записывать в таблицу Redshift.
Я могу читать из CSV и записывать в таблицу следующим образом:
df = spark.read \
.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load(s3_source_location)
jdbcconf = glueContext.extract_jdbc_conf(GLUE_CONNECTION_NAME)
df.write.format("jdbc") \
.option("url", jdbcconf.get("url") + '/' + REDSHIFT_DATABASE + '?user=' + jdbcconf.get('user') + '&password=' + jdbcconf.get('password')) \
.option("dbtable", REDSHIFT_TABLE_NAME) \
.option("tempdir", args["TempDir"]) \
.partitionBy("Ingestion_Date") \
.mode("overwrite") \
.save()
Однако вместо чтения из CSV я хочу читать из RDS, и я пытаюсь какследует:
RDS_CONNECTION_NAME = args['GlueConnectionRDS']
RDS_DATABASE = args['RDSDatabase']
RDS_TABLE_NAME = args['RDSTable']
jdbcconf = glueContext.extract_jdbc_conf(RDS_CONNECTION_NAME)
df = spark.read.format("jdbc") \
.option("url", jdbcconf.get("url") + '/' + RDS_DATABASE + '?user=' + jdbcconf.get('user') + '&password=' + jdbcconf.get('password')) \
.option("dbtable", RDS_TABLE_NAME)
REDSHIFT_CONNECTION_NAME = args['GlueConnectionRedshift']
REDSHIFT_DATABASE = args['RedshiftDatabase']
REDSHIFT_TABLE_NAME = args['RedshiftTable']
jdbcconf = glueContext.extract_jdbc_conf(REDSHIFT_CONNECTION_NAME)
df.write.format("jdbc") \
.option("url", jdbcconf.get("url") + '/' + REDSHIFT_DATABASE + '?user=' + jdbcconf.get('user') + '&password=' + jdbcconf.get('password')) \
.option("dbtable", REDSHIFT_TABLE_NAME) \
.option("tempdir", args["TempDir"]) \
.mode("overwrite") \
.save()
Логически я ожидал бы, что это сработает, однако я должен что-то упустить в DataFrameReader.Я получаю сообщение об ошибке:
AttributeError: у объекта 'DataFrameReader' нет атрибута 'write'.
Если я пытаюсь прочитать с помощью вызова .load()
и драйвера следующим образом:
df = spark.read.format("jdbc") \
.option("url", jdbcconf.get("url") + '/' + RDS_DATABASE + '?user=' + jdbcconf.get('user') + '&password=' + jdbcconf.get('password') + '/') \
.option("dbtable", RDS_TABLE_NAME) \
.option("inferSchema", "true") \
.option("driver", 'org.postgresql.Driver') \
.load()
Я получаю ошибку аутентификации, хотя я знаю, что мое соединение работает.Как правильно это сделать?