Работа Snowflake ETL с использованием PySpark работает на локальном, но не на Dataproc - PullRequest
1 голос
/ 27 февраля 2020

Я создал искровую работу и сначала протестировал ее на локальном компьютере, с которой она работала безупречно. Однако после передачи искрового задания в Datapro c возвращается следующая ошибка:

py4j.protocol.Py4JJavaError: An error occurred while calling o78.load.
: net.snowflake.client.jdbc.SnowflakeSQLException: Private key provided is invalid or not supported: Please use java.security.interfaces.RSAPrivateCrtKey.class

Код Pyspark

sfOptions = ConnectToSnowflake(creds_path='creds.json').get_spark_sf_creds()

spark = SparkSession \
    .builder \
    .config("spark.jars", "jars/snowflake-jdbc-3.8.0.jar,jars/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
    .config("spark.repl.local.jars",
            "jars/snowflake-jdbc-3.8.0.jar,jars/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .getOrCreate()

spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(
    spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

snowflake_source_name = 'net.snowflake.spark.snowflake'

df = spark.read.format(snowflake_source_name) \
    .options(**sfOptions) \
    .option("query", query) \
    .load()

df.show()

Datapro c

gcloud dataproc jobs submit pyspark --cluster featurelib-cluster \
    --jars gs://dataproc-featurelib/spark-lib/snowflake-jdbc-3.8.0.jar,gs://dataproc-featurelib/spark-lib/spark-snowflake_2.11-2.4.13-spark_2.4.jar \
    --py-files snowflake_connector.py \
    --files creds.json,daodl_access.json \
    sf_loader.py

Полная ошибка трассировки стека

net.snowflake.client.jdbc.SnowflakeSQLException: Private key provided is invalid or not supported: Please use java.security.interfaces.RSAPrivateCrtKey.class
        at net.snowflake.client.jdbc.SnowflakeConnectionV1.<init>(SnowflakeConnectionV1.java:136)
        at net.snowflake.client.jdbc.SnowflakeDriver.connect(SnowflakeDriver.java:148)
        at java.sql.DriverManager.getConnection(DriverManager.java:664)
        at java.sql.DriverManager.getConnection(DriverManager.java:208)
        at net.snowflake.spark.snowflake.JDBCWrapper.getConnector(SnowflakeJDBCWrapper.scala:180)
        at net.snowflake.spark.snowflake.SnowflakeRelation$$anonfun$schema$1.apply(SnowflakeRelation.scala:56)
        at net.snowflake.spark.snowflake.SnowflakeRelation$$anonfun$schema$1.apply(SnowflakeRelation.scala:53)
        at scala.Option.getOrElse(Option.scala:121)
        at net.snowflake.spark.snowflake.SnowflakeRelation.schema$lzycompute(SnowflakeRelation.scala:53)
        at net.snowflake.spark.snowflake.SnowflakeRelation.schema(SnowflakeRelation.scala:52)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:403)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
        at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:28)
        at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33)
        at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:35)
        at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:37)
        at $line17.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:39)
        at $line17.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:41)
        at $line17.$read$$iw$$iw$$iw$$iw.<init>(<console>:43)
        at $line17.$read$$iw$$iw$$iw.<init>(<console>:45)
        at $line17.$read$$iw$$iw.<init>(<console>:47)
        at $line17.$read$$iw.<init>(<console>:49)
        at $line17.$read.<init>(<console>:51)
        at $line17.$read$.<init>(<console>:55)
        at $line17.$read$.<clinit>(<console>)
        at $line17.$eval$.$print$lzycompute(<console>:7)
        at $line17.$eval$.$print(<console>:6)
        at $line17.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793)
        at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644)
        at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
        at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572)
        at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:819)
        at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:691)
        at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:404)
        at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:425)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:285)
        at org.apache.spark.repl.SparkILoop.runClosure(SparkILoop.scala:159)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:182)
        at org.apache.spark.repl.Main$.doMain(Main.scala:78)
        at org.apache.spark.repl.Main$.main(Main.scala:58)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

1 Ответ

0 голосов
/ 22 марта 2020

Чтобы передать ключ creds.json, вы должны использовать флаг --jars вместо флага --files (таким образом, он будет добавлен в classpath):

gcloud dataproc jobs submit pyspark --cluster featurelib-cluster \
    --jars gs://dataproc-featurelib/spark-lib/snowflake-jdbc-3.8.0.jar,gs://dataproc-featurelib/spark-lib/spark-snowflake_2.11-2.4.13-spark_2.4.jar,creds.json \
    --py-files snowflake_connector.py \
    --files daodl_access.json \
    sf_loader.py

Если он не работает, попытайтесь обновить банки Снежинки до последних версий.

...