Я хочу сохранить данные для воспламенения через фрейм данных spark. Когда я отправляю программу, используя spark on yarn, я всегда получаю следующую ошибку. Я не понимаю, о чем это спрашивает, где я делаю это неправильно?
df.repartition(new Column("id")).write
.format(IgniteDataFrameSettings.FORMAT_IGNITE)
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE, properties.getIgniteConfigPath)
.option(IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE, true)
.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "groupcode,openid,laststorecode,consumetime")
.option(IgniteDataFrameSettings.OPTION_STREAMER_FLUSH_FREQUENCY, 10000)
.option(IgniteDataFrameSettings.OPTION_STREAMER_PER_NODE_BUFFER_SIZE, 1024)
.option(IgniteDataFrameSettings.OPTION_TABLE, ignite_table_name)
.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS, "template=partitioned,backups=0,CACHE_NAME=" + ignite_table_name)
.mode(SaveMode.Overwrite).save()
сценарий spark-submit:
/data/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --queue group_test --class cn.com.mainclass --name CustomerCareDataProcess --master yarn --deploy-mode client --conf spark.default.parallelism=50 --conf spark.sql.shuffle.partitions=50 --conf spark.sql.auto.repartition=true --conf spark.executor.extraClassPath=/data/ignite/* --conf spark.driver.extraClassPath=/data/ignite/* --conf spark.driver.extraJavaOptions=-Dfile.encoding=utf8 --conf spark.executor.extraJavaOptions=-Dfile.encoding=utf8 --num-executors 4 --executor-memory 2G --total-executor-cores 4 --jars /data/ignite/customer-2.0-SNAPSHOT-jar-with-dependencies.jar /data/ignite/customer-2.0-SNAPSHOT.jar
Наконец-то появилась следующая ошибка:
Caused by: java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.ignite.spark.impl.QueryHelper$.org$apache$ignite$spark$impl$QueryHelper$$savePartition(QueryHelper.scala:155)
at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable$1.apply(QueryHelper.scala:117)
at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable$1.apply(QueryHelper.scala:116)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Я не понимаю, что не так, и кто может мне помочь ???