DataFrameReader читает из одной таблицы, пишет в другую - PullRequest
0 голосов
/ 23 сентября 2018

Использование PySpark DataFrameReader Я пытаюсь читать из таблицы RDS и записывать в таблицу Redshift.

Я могу читать из CSV и записывать в таблицу следующим образом:

df = spark.read \
        .format("com.databricks.spark.csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(s3_source_location)

jdbcconf = glueContext.extract_jdbc_conf(GLUE_CONNECTION_NAME)

df.write.format("jdbc") \
    .option("url", jdbcconf.get("url") + '/' + REDSHIFT_DATABASE + '?user=' + jdbcconf.get('user') + '&password=' + jdbcconf.get('password')) \
    .option("dbtable", REDSHIFT_TABLE_NAME) \
    .option("tempdir", args["TempDir"]) \
    .partitionBy("Ingestion_Date") \
    .mode("overwrite") \
    .save()

Однако вместо чтения из CSV я хочу читать из RDS, и я пытаюсь какследует:

RDS_CONNECTION_NAME = args['GlueConnectionRDS']
RDS_DATABASE = args['RDSDatabase']
RDS_TABLE_NAME = args['RDSTable']

jdbcconf = glueContext.extract_jdbc_conf(RDS_CONNECTION_NAME)

df  = spark.read.format("jdbc") \
    .option("url", jdbcconf.get("url") + '/' + RDS_DATABASE + '?user=' + jdbcconf.get('user') + '&password=' + jdbcconf.get('password')) \
    .option("dbtable", RDS_TABLE_NAME)

REDSHIFT_CONNECTION_NAME = args['GlueConnectionRedshift']
REDSHIFT_DATABASE = args['RedshiftDatabase']
REDSHIFT_TABLE_NAME = args['RedshiftTable']

jdbcconf = glueContext.extract_jdbc_conf(REDSHIFT_CONNECTION_NAME)

df.write.format("jdbc") \
    .option("url", jdbcconf.get("url") + '/' + REDSHIFT_DATABASE + '?user=' + jdbcconf.get('user') + '&password=' + jdbcconf.get('password')) \
    .option("dbtable", REDSHIFT_TABLE_NAME) \
    .option("tempdir", args["TempDir"]) \
    .mode("overwrite") \
    .save()

Логически я ожидал бы, что это сработает, однако я должен что-то упустить в DataFrameReader.Я получаю сообщение об ошибке:

AttributeError: у объекта 'DataFrameReader' нет атрибута 'write'.

Если я пытаюсь прочитать с помощью вызова .load() и драйвера следующим образом:

df  = spark.read.format("jdbc") \
    .option("url", jdbcconf.get("url") + '/' + RDS_DATABASE + '?user=' + jdbcconf.get('user') + '&password=' + jdbcconf.get('password') + '/') \
    .option("dbtable", RDS_TABLE_NAME) \
    .option("inferSchema", "true") \
    .option("driver", 'org.postgresql.Driver') \
    .load()

Я получаю ошибку аутентификации, хотя я знаю, что мое соединение работает.Как правильно это сделать?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...