Ошибка загрузки задания склеивания ETL в снежинку - PullRequest
1 голос
/ 07 января 2020

Я пытаюсь загрузить данные из CSV-файлов S3 Buckets в снежинку с помощью клея ETL. Написал скрипт python в задании ETL для того же, что и ниже:

    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from py4j.java_gateway import java_import
    SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

    ## @params: [JOB_NAME, URL, ACCOUNT, WAREHOUSE, DB, SCHEMA, USERNAME, PASSWORD]
    args = getResolvedOptions(sys.argv, ['JOB_NAME', 'URL', 'ACCOUNT', 'WAREHOUSE', 'DB', 'SCHEMA', 
    'USERNAME', 'PASSWORD'])
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    java_import(spark._jvm, "net.snowflake.spark.snowflake")


    spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession 
     (spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
     sfOptions = {
     "sfURL" : args['URL'],
     "sfAccount" : args['ACCOUNT'],
     "sfUser" : args['USERNAME'],
     "sfPassword" : args['PASSWORD'],
     "sfDatabase" : args['DB'],
     "sfSchema" : args['SCHEMA'],
     "sfWarehouse" : args['WAREHOUSE'],
      }

     dyf = glueContext.create_dynamic_frame.from_catalog(database = "salesforcedb", table_name = 
     "pr_summary_csv", transformation_ctx = "dyf")
     df=dyf.toDF()
     ##df.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("parallelism", 
     "8").option("dbtable", "abcdef").mode("overwrite").save()
     df.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "abcdef").save()
     job.commit()

Сгенерирована ошибка:

Произошла ошибка при вызове o81.save. Указано неправильное имя пользователя или пароль.

Однако, если я не конвертирую в фрейм данных Spark и не использую непосредственно динамический c фрейм, я получаю ошибку, подобную этой:

AttributeError: у объекта 'function' нет атрибута 'format'

Может ли кто-нибудь просмотреть мой код и сказать, что я делаю неправильно для преобразования кадра Dynami c в DF? Пожалуйста, дайте мне знать, если мне нужно предоставить больше информации.

Кстати, я новичок ie в снежинке, и это мой тест по загрузке данных через AWS Glue. ?

Ответы [ 2 ]

0 голосов
/ 09 января 2020

Вот протестированный Glue Code (вы можете скопировать пасту, так как она только меняет имя таблицы), которую вы можете использовать для настройки Glue ETL. Вам нужно будет добавить банки JDB C и Spark. Вы можете использовать приведенную ниже ссылку для настройки: https://community.snowflake.com/s/article/How-To-Use-AWS-Glue-With-Snowflake


import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from py4j.java_gateway import java_import
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake";

## @params: [JOB_NAME, URL, ACCOUNT, WAREHOUSE, DB, SCHEMA, USERNAME, PASSWORD]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'URL', 'ACCOUNT', 'WAREHOUSE', 'DB', 'SCHEMA', 'USERNAME', 'PASSWORD'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


## uj = sc._jvm.net.snowflake.spark.snowflake
spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
sfOptions = {
"sfURL" : args['URL'],
"sfAccount" : args['ACCOUNT'],
"sfUser" : args['USERNAME'],
"sfPassword" : args['PASSWORD'],
"sfDatabase" : args['DB'],
"sfSchema" : args['SCHEMA'],
"sfWarehouse" : args['WAREHOUSE'],
}

## Read from a Snowflake table into a Spark Data Frame
df = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "Select * from <tablename>").load()
df.show()

## Perform any kind of transformations on your data and save as a new Data Frame: df1 = df.[Insert any filter, transformation, or other operation]
## Write the Data Frame contents back to Snowflake in a new table df1.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "[new_table_name]").mode("overwrite").save() job.commit()

0 голосов
/ 07 января 2020

ошибка при вызове o81.save. Указано неверное имя пользователя или пароль.

В сообщении об ошибке указывается, что произошла ошибка в отношении пользователя или пароля. Если вы уверены, что имя пользователя и пароль верны, убедитесь, что имя учетной записи и URL-адрес снежинки также верны.

Однако, если я не преобразую в фрейм данных Spark, и использовать динамически c фрейм. Я получаю ошибку, подобную этой:

AttributeError: у объекта 'function' нет атрибута 'format'

Метод записи Glue DynamicFrame отличается от Spark DataFrame, поэтому нормально, чтобы не было одинаковых методов. Пожалуйста, проверьте документацию:

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html#aws -glue-api-crawler-pyspark-extensions-dynamici c -frame-write

Кажется, вам нужно задайте параметры как connection_options:

write(connection_type, connection_options, format, format_options, accumulator_size)

connection_options = {"url": "jdbc-url/database", "user": "username", "password": "password","dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"} 

Даже если вы используете DynamicFrame, вы, вероятно, получите неправильное имя пользователя или пароль. Поэтому я предлагаю вам сосредоточиться на исправлении учетных данных.

...