Spark DataFrame записывает данные в Ignite (2.7.5) всегда сообщает об ошибке - PullRequest
1 голос
/ 10 февраля 2020

Я хочу сохранить данные для воспламенения через фрейм данных spark. Когда я отправляю программу, используя spark on yarn, я всегда получаю следующую ошибку. Я не понимаю, о чем это спрашивает, где я делаю это неправильно?

df.repartition(new Column("id")).write
  .format(IgniteDataFrameSettings.FORMAT_IGNITE)
  .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE, properties.getIgniteConfigPath)
  .option(IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE, true)
  .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "groupcode,openid,laststorecode,consumetime")
  .option(IgniteDataFrameSettings.OPTION_STREAMER_FLUSH_FREQUENCY, 10000)
  .option(IgniteDataFrameSettings.OPTION_STREAMER_PER_NODE_BUFFER_SIZE, 1024)
  .option(IgniteDataFrameSettings.OPTION_TABLE, ignite_table_name)
  .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS, "template=partitioned,backups=0,CACHE_NAME=" + ignite_table_name)
  .mode(SaveMode.Overwrite).save()

сценарий spark-submit:

/data/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --queue group_test --class cn.com.mainclass --name CustomerCareDataProcess --master yarn --deploy-mode client  --conf spark.default.parallelism=50 --conf spark.sql.shuffle.partitions=50 --conf spark.sql.auto.repartition=true --conf spark.executor.extraClassPath=/data/ignite/* --conf spark.driver.extraClassPath=/data/ignite/* --conf spark.driver.extraJavaOptions=-Dfile.encoding=utf8 --conf spark.executor.extraJavaOptions=-Dfile.encoding=utf8 --num-executors 4 --executor-memory 2G --total-executor-cores 4  --jars /data/ignite/customer-2.0-SNAPSHOT-jar-with-dependencies.jar /data/ignite/customer-2.0-SNAPSHOT.jar

Наконец-то появилась следующая ошибка:

Caused by: java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at org.apache.ignite.spark.impl.QueryHelper$.org$apache$ignite$spark$impl$QueryHelper$$savePartition(QueryHelper.scala:155)
    at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable$1.apply(QueryHelper.scala:117)
    at org.apache.ignite.spark.impl.QueryHelper$$anonfun$saveTable$1.apply(QueryHelper.scala:116)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Я не понимаю, что не так, и кто может мне помочь ???

...