Что такое исправление для java.util.NoSuchElementException: None.get, который появляется при запуске приложения spark для перемещения данных в HDFS? - PullRequest
0 голосов
/ 05 февраля 2019

Я пытаюсь переместить данные из GP в Hive с помощью искрового соединителя greenplum jar: greenplum-spark_2.11-1.3.0.jar.Я читаю таблицу, как показано ниже:

  val conf = new SparkConf().setAppName("Data_Read").set("spark.executor.heartbeatInterval", "1200s").set("spark.network.timeout", "12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.shuffle.compress", "true").set("spark.shuffle.spill.compress", "true").set("spark.sql.orc.filterPushdown", "true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max", "512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown", "true").set("spark.yarn.driver.memoryOverhead", "8192").set("spark.yarn.executor.memoryOverhead", "8192").set("spark.dynamicAllocation.enabled", "false").set("spark.shuffle.service.enabled", "true").set("spark.sql.tungsten.enabled", "true").set("spark.executor.instances", "1").set("spark.executor.memory", "1g").set("spark.executor.cores", "1").set("spark.files.maxPartitionBytes", "268435468").set("spark.sql.shuffle.partitions", "400")
  try {
    Class.forName("io.pivotal.greenplum.spark.GreenplumRelationProvider").newInstance()
  }
  catch {
    case cnf: ClassNotFoundException =>
      println("No class def found. Killing the application...." + cnf.printStackTrace())
      System.exit(1)
    case e: Exception =>
      println("No class def found. Killing the application...." + e.printStackTrace())
      System.exit(1)
  }
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  val yearDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", "url").option("server.port","port").option("dbtable", "tablename").option("dbschema","schema").option("user", "userID").option("password", "12345678").option("partitionColumn","headers").option("partitions",64).load().where("period_year=2017 and period_num=12 and source_system_name='BANKERS'").select(splitSeq map col:_*).withColumn("delCol", lit(0))

  dataDF.write.format("csv").save("hdfs://location_of_the_file_on_HDFS")

Я вижу, что запросы ниже также влияют на GP.

INSERT INTO "schema"."spark_c04d88c2015291d9_38ee758b9192317b_driver_1016" 
(creation_date,attribute50,headers,attribute12,drill_line_pk_id,sap_partner_func_area,attribute38,global_attribute2,global_attribute4,usd_mor_activity_amount,attribute5,journal_line,drill_debug_info,reference_8,last_updated_by,sap_reference,document_type,attribute47,attribute48,last_update_tms,created_by_name) 
SELECT creation_date,attribute50,headers,attribute12,drill_line_pk_id,sap_partner_func_area,attribute38,global_attribute2,global_attribute4,usd_mor_activity_amount,attribute5,journal_line,drill_debug_info,reference_8,last_updated_by,sap_reference,document_type,attribute47,attribute48,last_update_tms,created_by_name 
FROM "schema"."table" WHERE ("period_year" IS NOT NULL AND "period_num" IS NOT NULL AND "period_num" = 12 AND "source_system_name" = 'BANKERS' AND "period_year" = 2017) 
AND (gp_segment_id = 48)

Но задание выполняется некоторое время и без какого-либо надлежащего результата / прогресса, заканчивается исключением: java.util.NoSuchElementException: None.get.Сообщение о полном стеке исключений можно увидеть ниже.В наших базах данных greenplum есть 64 сегмента, и поэтому он создает 64 задачи, которые можно увидеть ниже.

Exception Message:
[Stage 5:> (0 + 48) / 64]18/12/27 10:29:10 WARN TaskSetManager: Lost task 6.0 in stage 5.0 (TID 11, executor 11): java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at io.pivotal.greenplum.spark.jdbc.Jdbc$.copyTable(Jdbc.scala:43)
    at io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.liftedTree1$1(GreenplumRowIterator.scala:110)
    at io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.<init>(GreenplumRowIterator.scala:109)
    at io.pivotal.greenplum.spark.GreenplumRDD.compute(GreenplumRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Используется Spark-submit:

SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.PartitionData --master=yarn --conf spark.ui.port=4090 --driver-class-path /home/devuser/jars/greenplum-spark_2.11-1.3.0.jar:/home/devuser/jars/postgresql-42.1.4.jar --conf spark.jars=/home/devuser/jars/greenplum-spark_2.11-1.3.0.jar,/home/devuser/jars/postgresql-42.1.4.jar --executor-cores 3 --executor-memory 13G --keytab /home/devuser/devuser.keytab --principal devuser@DEVENV.COM --files /usr/hdp/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/devuser/jars/greenplum-spark_2.11-1.3.0.jar splinter_2.11-0.1.jar

Файл jar: greenplum-spark_2.11-1.3.0.jar использует протокол gpfdist.Я использую класс драйвера, упомянутый в основной документации greenplum документация .Я пытался отладить проблему столько, сколько смог, но я не понимаю, в чем дело.Кто-нибудь может дать мне знать, какую ошибку я здесь делаю?

1 Ответ

0 голосов
/ 05 февраля 2019

Какую версию greenplum вы используете?Кроме того, вы пробовали последнюю версию (v.1.6) GP Spark Connector?Последняя версия улучшает использование памяти и способ создания разделов.

Я заметил, что ваш DF использует параметр partitions, который был представлен только в версии 1.6.В версии 1.3 поддерживается опция partitionsPerSegment, которая по умолчанию равна 1. Вы можете попробовать значение partitionsPerSegment 2 и посмотреть, есть ли какая-либо разница.

val yearDF = spark.read
.format("io.pivotal.greenplum.spark.GreenplumRelationProvider")
...
.option("partitions", 64)
.load()...

Надеюсь, эта информация поможет, пожалуйста, попробуйте мойпредложение выше или последняя версия (1.6) разъема Spark Greenplum.

...