получение таблицы 'NM_TEMP_STAGING_1100952600' не существует с использованием aws клея и снежинки - PullRequest
0 голосов
/ 29 января 2020

Я использую клейкую работу для записи конвейера данных. Я взял код от сообщества, который выглядит следующим образом

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from py4j.java_gateway import java_import
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake" 
#args = getResolvedOptions(sys.argv, ['JOB_NAME'])
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'URL', 'ACCOUNT', 'WAREHOUSE', 'DB', 'SCHEMA', 'USERNAME', 'PASSWORD', 'ROLE'])

sparkContext = SparkContext()
glueContext = GlueContext(sparkContext)
sparkSession = glueContext.spark_session


glueJob = Job(glueContext)
glueJob.init(args['JOB_NAME'], args)

##Use the CData JDBC driver to read Snowflake data from the Products table into a DataFrame
##Note the populated JDBC URL and driver class name

java_import(sparkSession._jvm, SNOWFLAKE_SOURCE_NAME)
sparkSession._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(sparkSession._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
tmp_dir=args["TempDir"]
sfOptions = {
"sfURL" : args['URL'],
"sfAccount" : args['ACCOUNT'],
"sfUser" : args['USERNAME'],
"sfPassword" : args['PASSWORD'],
"sfDatabase" : args['DB'],
"sfSchema" : args['SCHEMA'],
"sfRole" : args['ROLE'],
"sfWarehouse" : args['WAREHOUSE'],
"preactions" : "USE DATABASE dev_lz;",
}

#"tempDir" : tmp_dir,

print('=========DB Connection details ================== ', sfOptions)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "aws-nonprod-datalake-glue-catalog", table_name = "nm_s_amaster", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [ mappings], transformation_ctx = "applymapping1")
selectfields2 = SelectFields.apply(frame = applymapping1, paths = [columns], transformation_ctx = "selectfields2")
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "aws-nonprod-datalake-glue-catalog", table_name = "NM_TEMP", transformation_ctx = "resolvechoice3")
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")

##Convert DataFrames to AWS Glue's DynamicFrames Object

    resolvechoice4.toDF().write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("preactions","USE DATABASE dev_lz").option("dbtable", "nm_temp").mode("overwrite").save()


glueJob.commit()

Но после запуска кода я получаю net .snowflake.client.jdb c .SnowflakeSQLException: SQL Ошибка компиляции: Таблица 'NM_TEMP_STAGING_1100952600 'не существует

пожалуйста, дайте мне знать, если я что-то упустил. У меня есть разрешение для создать, выбрать этап, создать, выбрать таблицу и создать будущие таблицы . Код выше, я удалил столбцы и сопоставления. но оригинальный код доступен.

1 Ответ

0 голосов
/ 13 февраля 2020
resolvechoice4.toDF().write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("preactions","USE DATABASE dev_lz").option("dbtable", "nm_temp").mode("overwrite").save()

Добавлено следующее в вышеупомянутой опции dbtable, она начала работать,

.option("preactions","USE ROLE DEVELOPER;USE DATABASE dev_db;USE SCHEMA aws_test")

следующим образом

resolvechoice4.toDF().write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("preactions","USE DATABASE dev_lz").option("preactions","USE ROLE DEVELOPER;USE DATABASE dev_db;USE SCHEMA aws_test").option("dbtable", "nm_temp").mode("overwrite").save()
...