Я пытаюсь переместить данные из GP в Hive с помощью искрового соединителя greenplum jar: greenplum-spark_2.11-1.3.0.jar.Я читаю таблицу, как показано ниже:
val conf = new SparkConf().setAppName("Data_Read").set("spark.executor.heartbeatInterval", "1200s").set("spark.network.timeout", "12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.shuffle.compress", "true").set("spark.shuffle.spill.compress", "true").set("spark.sql.orc.filterPushdown", "true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max", "512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown", "true").set("spark.yarn.driver.memoryOverhead", "8192").set("spark.yarn.executor.memoryOverhead", "8192").set("spark.dynamicAllocation.enabled", "false").set("spark.shuffle.service.enabled", "true").set("spark.sql.tungsten.enabled", "true").set("spark.executor.instances", "1").set("spark.executor.memory", "1g").set("spark.executor.cores", "1").set("spark.files.maxPartitionBytes", "268435468").set("spark.sql.shuffle.partitions", "400")
try {
Class.forName("io.pivotal.greenplum.spark.GreenplumRelationProvider").newInstance()
}
catch {
case cnf: ClassNotFoundException =>
println("No class def found. Killing the application...." + cnf.printStackTrace())
System.exit(1)
case e: Exception =>
println("No class def found. Killing the application...." + e.printStackTrace())
System.exit(1)
}
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
val yearDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", "url").option("server.port","port").option("dbtable", "tablename").option("dbschema","schema").option("user", "userID").option("password", "12345678").option("partitionColumn","headers").option("partitions",64).load().where("period_year=2017 and period_num=12 and source_system_name='BANKERS'").select(splitSeq map col:_*).withColumn("delCol", lit(0))
dataDF.write.format("csv").save("hdfs://location_of_the_file_on_HDFS")
Я вижу, что запросы ниже также влияют на GP.
INSERT INTO "schema"."spark_c04d88c2015291d9_38ee758b9192317b_driver_1016"
(creation_date,attribute50,headers,attribute12,drill_line_pk_id,sap_partner_func_area,attribute38,global_attribute2,global_attribute4,usd_mor_activity_amount,attribute5,journal_line,drill_debug_info,reference_8,last_updated_by,sap_reference,document_type,attribute47,attribute48,last_update_tms,created_by_name)
SELECT creation_date,attribute50,headers,attribute12,drill_line_pk_id,sap_partner_func_area,attribute38,global_attribute2,global_attribute4,usd_mor_activity_amount,attribute5,journal_line,drill_debug_info,reference_8,last_updated_by,sap_reference,document_type,attribute47,attribute48,last_update_tms,created_by_name
FROM "schema"."table" WHERE ("period_year" IS NOT NULL AND "period_num" IS NOT NULL AND "period_num" = 12 AND "source_system_name" = 'BANKERS' AND "period_year" = 2017)
AND (gp_segment_id = 48)
Но задание выполняется некоторое время и без какого-либо надлежащего результата / прогресса, заканчивается исключением: java.util.NoSuchElementException: None.get
.Сообщение о полном стеке исключений можно увидеть ниже.В наших базах данных greenplum есть 64 сегмента, и поэтому он создает 64 задачи, которые можно увидеть ниже.
Exception Message:
[Stage 5:> (0 + 48) / 64]18/12/27 10:29:10 WARN TaskSetManager: Lost task 6.0 in stage 5.0 (TID 11, executor 11): java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at io.pivotal.greenplum.spark.jdbc.Jdbc$.copyTable(Jdbc.scala:43)
at io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.liftedTree1$1(GreenplumRowIterator.scala:110)
at io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.<init>(GreenplumRowIterator.scala:109)
at io.pivotal.greenplum.spark.GreenplumRDD.compute(GreenplumRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Используется Spark-submit:
SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.PartitionData --master=yarn --conf spark.ui.port=4090 --driver-class-path /home/devuser/jars/greenplum-spark_2.11-1.3.0.jar:/home/devuser/jars/postgresql-42.1.4.jar --conf spark.jars=/home/devuser/jars/greenplum-spark_2.11-1.3.0.jar,/home/devuser/jars/postgresql-42.1.4.jar --executor-cores 3 --executor-memory 13G --keytab /home/devuser/devuser.keytab --principal devuser@DEVENV.COM --files /usr/hdp/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/devuser/jars/greenplum-spark_2.11-1.3.0.jar splinter_2.11-0.1.jar
Файл jar: greenplum-spark_2.11-1.3.0.jar использует протокол gpfdist.Я использую класс драйвера, упомянутый в основной документации greenplum документация .Я пытался отладить проблему столько, сколько смог, но я не понимаю, в чем дело.Кто-нибудь может дать мне знать, какую ошибку я здесь делаю?