Я новичок в scala / java программирование
Мне нужно создать шаблон scala jdbcTemplate для сопоставления нескольких столбцов с запросом SQL для базы данных PostgreSQL.
Мой запрос вставки содержит около 80 столбцов.
примерно так:
INSERT into schema.table_one (cov_eff_dt,cov_canc_dt,cov_pd_thru_dt,ebill_dt,retro_elig_recv_dt,retro_orig_cov_eff_dt,retro_orig_cov_canc_dt,cobra_eff_dt,elig_grc_prd_thru_dt,lst_prem_pd_dt,pol_ren_dt,partn_nbr,xref_id_partn_nbr,cnsm_id,prfl_id,src_cdb_xref_id,cos_pnl_nbr,src_tmstmp,row_tmstmp,created_dttm,updt_dttm,src_cd,lgcy_pol_nbr,lgcy_src_id,cov_typ_cd,cos_div_cd,mkt_typ_cd,cos_grp_nbr,lgcy_prdt_typ_cd,lgcy_prdt_cd,cov_lvl_typ_cd,shr_arng_cd,shr_arng_oblig_cd,lgcy_pln_var_cd,lgcy_rpt_cd,prdt_srvc_typ_cd,ee_sts_typ_cd,govt_pgm_typ_cd,clm_sys_typ_cd,elig_sys_typ_cd,ces_grp_nbr,mkt_site_cd,row_sts_cd,medica_trvlben_ind,row_user_id,sec_typ_cd,cancel_rsn_typ_cd,cov_pd_thru_rsn_cd,list_bill_typ_cd,billing_sufx_cd,billing_subgrp_nbr,retro_days,retro_typ_cd,retro_ovrd_typ_cd,tops_cov_lvl_typ_cd,lgcy_ben_pln_id,lgcy_prdt_id,rr_ben_grp_nbr,rr_ben_grp_cho_cd,rr_br_cd,rr_un_cd,rr_optout_plan_ind,updt_typ_cd,racf_id,prr_cov_mo,fund_typ_cd,state_of_issue_cd,cobra_mo,cobra_qual_evnt_cd,grndfathered_pol_ind,deriv_cov_ind,cnsm_lgl_enty_nm,indv_grp_typ_cd,src_cov_mnt_typ_cd,pbp_cd,h_cntrct_id,risk_typ_cd,bil_typ_cd,rate_cov_typ_cd,plan_cd,seg_id,src_sys_id) VALUES ( ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT (cov_eff_dt,xref_id_partn_nbr,src_cdb_xref_id,src_cd,lgcy_pol_nbr,lgcy_src_id,cov_typ_cd)
DO UPDATE SET cov_canc_dt= ?,cov_pd_thru_dt= ?,ebill_dt= ?,retro_elig_recv_dt= ?,retro_orig_cov_eff_dt= ?,retro_orig_cov_canc_dt= ?,cobra_eff_dt= ?,elig_grc_prd_thru_dt= ?,lst_prem_pd_dt= ?,pol_ren_dt= ?,partn_nbr= ?,prfl_id= ?,cnsm_id= ?,cos_pnl_nbr= ?,src_tmstmp= ?,row_tmstmp= ?,updt_dttm= ?,cos_div_cd= ?,mkt_typ_cd= ?,cos_grp_nbr= ?,lgcy_prdt_typ_cd= ?,lgcy_prdt_cd= ?,cov_lvl_typ_cd= ?,shr_arng_cd= ?,shr_arng_oblig_cd= ?,lgcy_pln_var_cd= ?,lgcy_rpt_cd= ?,prdt_srvc_typ_cd= ?,ee_sts_typ_cd= ?,govt_pgm_typ_cd= ?,clm_sys_typ_cd= ?,elig_sys_typ_cd= ?,ces_grp_nbr= ?,mkt_site_cd= ?,row_sts_cd= ?,medica_trvlben_ind= ?,row_user_id= ?,sec_typ_cd= ?,cancel_rsn_typ_cd= ?,cov_pd_thru_rsn_cd= ?,list_bill_typ_cd= ?,billing_sufx_cd= ?,billing_subgrp_nbr= ?,retro_days= ?,retro_typ_cd= ?,retro_ovrd_typ_cd= ?,tops_cov_lvl_typ_cd= ?,lgcy_ben_pln_id= ?,lgcy_prdt_id= ?,rr_ben_grp_nbr= ?,rr_ben_grp_cho_cd= ?,rr_br_cd= ?,rr_un_cd= ?,rr_optout_plan_ind= ?,updt_typ_cd= ?,racf_id= ?,prr_cov_mo= ?,fund_typ_cd= ?,state_of_issue_cd= ?,cobra_mo= ?,cobra_qual_evnt_cd= ?,grndfathered_pol_ind= ?,deriv_cov_ind= ?,cnsm_lgl_enty_nm= ?,indv_grp_typ_cd= ?,src_cov_mnt_typ_cd= ?,pbp_cd= ?,h_cntrct_id= ?,risk_typ_cd= ?,bil_typ_cd= ?,rate_cov_typ_cd= ?,plan_cd= ?,seg_id= ?,src_sys_id= ?
Данные, которые необходимо поместить в "?" заполнитель хранится в другом фрейме данных с именем inputdatafiledfwindow.
Отображение для столбцов, т. е. функции для установки значений в подготовленном выражении, должно генерироваться динамически.
val inputdatafiledfwindow = inputdatafiledf.select("*").withColumn("rank",row_number().over(windowSpec)).where("rank = 1").drop("rank")
inputdatafiledfwindow.coalesce(10).foreachPartition(partition => {
val dbc: Connection = DriverManager.getConnection(jdbcConnectionString, user.toString, password.toString)
val st = dbc.prepareStatement(updatequeryforInsertAndUpdate)
partition.grouped(50).foreach(batch => {
batch.foreach { row => {
st.setShort(1, row.getShort(0))
st.setInt(2, row.getInt(1))
st.setString(3, row.getString(2).replaceAll("\\000", ""))
st.setString(4, row.getString(3).replaceAll("\\000", ""))
st.setString(5, row.getString(4).replaceAll("\\000", ""))
st.setString(6, row.getString(5).replaceAll("\\000", ""))
st.setDate(7, row.getDate(6))
st.setDate(8, row.getDate(7))
st.setString(9, row.getString(8).replaceAll("\\000", ""))
st.setString(10, row.getString(9).replaceAll("\\000", ""))
st.setString(11, row.getString(10).replaceAll("\\000", ""))
st.setString(12, row.getString(11).replaceAll("\\000", ""))
st.setString(13, row.getString(12).replaceAll("\\000", ""))
st.setString(14, row.getString(13).replaceAll("\\000", ""))
st.setString(15, row.getString(14).replaceAll("\\000", ""))
st.setString(16, row.getString(15).replaceAll("\\000", ""))
st.setString(17, row.getString(16).replaceAll("\\000", ""))
st.setString(18, row.getString(17).replaceAll("\\000", ""))
st.setString(19, row.getString(18).replaceAll("\\000", ""))
st.setString(20, row.getString(19).replaceAll("\\000", ""))
st.setString(21, row.getString(20).replaceAll("\\000", ""))
st.setString(22, row.getString(21).replaceAll("\\000", ""))
st.setString(23, row.getString(22).replaceAll("\\000", ""))
st.setString(24, row.getString(23).replaceAll("\\000", ""))
st.setString(25, row.getString(24).replaceAll("\\000", ""))
st.setString(26, row.getString(25).replaceAll("\\000", ""))
}
st.addBatch()
}
}
st.executeBatch()
})
dbc.close()
})
В настоящее время я ' Я пытаюсь что-то вроде этого:
val columnNames: scala.Array[String] = inputdatafiledfwindow.columns
val columnDataTypes: scala.Array[String] = inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString)
inputdatafiledfwindow.coalesce(10).foreachPartition(partition => {
val columnNames_br = sc.broadcast(inputdatafiledfwindow.columns)
val columnDataTypes_br = sc.broadcast(inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString))
val dbc: Connection = DriverManager.getConnection(jdbcConnectionString, user.toString, password.toString)
val st = dbc.prepareStatement(updatequeryforInsertAndUpdate)
partition.grouped(50).foreach(batch => {
batch.foreach { row => {
for (i<-0 to columnNames.length-1) {
if (columnDataTypes(i) == "ShortType")
st.setShort((i+1).toInt, row.getShort(i))
else if(columnDataTypes(i)== "IntegerType")
st.setInt((i+1).toInt,row.getInt(i))
else if (columnDataTypes(i)=="StringType")
st.setString((i+1).toInt,row.getString(i))
else if(columnDataTypes(i)=="TimestampType")
st.setTimestamp((i+1).toInt, row.getTimestamp(i))
else if(columnDataTypes(i)=="DateType")
st.setDate((i+1).toInt,row.getDate(i))
else if (columnDataTypes(i)=="DoubleType")
st.setDouble((i+1).toInt, row.getDouble(i))
}
st.addBatch()
}
}
st.executeBatch()
})
dbc.close()
})
И это дает мне: org. apache .spark.SparkException: Ошибка задачи не сериализуема
Любые идеи или ресурсы, которые я могу использовать для реализации это. Я знаю, что это возможно в java, но я не слишком много работал в java, ни в Scala.
Редактирование: пробовал использовать braodcast
переменную внутри foreachPartition Все еще получая org.apache.spark.SparkException: Task not serializable
Ниже приведен полный стек исключений:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2343)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:957)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:956)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:956)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2735)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2735)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2735)
at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId$1.apply(Dataset.scala:3349)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3345)
at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2734)
... 80 elided
Caused by: java.io.NotSerializableException: org.postgresql.jdbc.PgConnection
Serialization stack:
- object not serializable (class: org.postgresql.jdbc.PgConnection, value: org.postgresql.jdbc.PgConnection@71c7a55b)
- field (class: $iw, name: dbc, type: interface java.sql.Connection)
- object (class $iw, $iw@22459ca5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@788dd40c)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@31c725ed)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4a367987)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7cffd7ab)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3c615880)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@289fa6c2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2a5a0934)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4a04a12a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@c5fe90a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@58b67f02)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@243a4a22)
- field (class: $line77.$read, name: $iw, type: class $iw)
- object (class $line77.$read, $line77.$read@5f473976)
- field (class: $iw, name: $line77$read, type: class $line77.$read)
- object (class $iw, $iw@70fc6803)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@26818b0)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 98 more
Обновление 2: внесены изменения, предложенные @RamGhadiyaram, но теперь они столкнулись с новой исключительной ситуацией NullPointerException. Я не понимаю, где я иду не так. Конечно, это должно быть простое решение.
Ниже приведен обновленный код:
val inputdatafiledfwindow = inputdatafiledf.select("*").withColumn("rank",row_number().over(windowSpec)).where("rank = 1").drop("rank")
val updatequeryforInsertAndUpdate = "INSERT INTO " + schema + table_name + updatequery + " where " + schema + table_name + s".row_tmstmp < '2020-02-17 00:00:00' OR ${table_name}.row_tmstmp < ?"
val columnNames: scala.Array[String] = inputdatafiledfwindow.columns
val columnDataTypes: scala.Array[String] = inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString)
inputdatafiledfwindow.foreachPartition(partition => {
val columnNames_br = sc.broadcast(columnNames)
val columnDataTypes_br = sc.broadcast(columnDataTypes)
val dbc: Connection = DriverManager.getConnection(jdbcConnectionString)
val st = dbc.prepareStatement(updatequeryforInsertAndUpdate)
partition.grouped(50).foreach(batch => {
batch.foreach { row => {
for (i<-0 to columnNames_br.value.length-1) {
if (columnDataTypes_br.value(i) == "ShortType")
st.setShort((i+1), row.getShort(i))
else if(columnDataTypes_br.value(i)== "IntegerType")
st.setInt((i+1),row.getInt(i))
else if (columnDataTypes_br.value(i)=="StringType")
st.setString((i+1),row.getString(i))
else if(columnDataTypes_br.value(i)=="TimestampType")
st.setTimestamp((i+1), row.getTimestamp(i))
else if(columnDataTypes_br.value(i)=="DateType")
st.setDate((i+1),row.getDate(i))
else if (columnDataTypes_br.value(i)=="DoubleType")
st.setDouble((i+1), row.getDouble(i))
}
st.addBatch()
}
}
st.executeBatch()
})
dbc.close()
})
и ниже новый стек исключений:
20/03/25 11:12:49 WARN TaskSetManager: Lost task 0.0 in stage 19.0 (TID 176, dbslp1102.uhc.com, executor 4): java.lang.NullPointerException
at $line89.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:87)
at $line89.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:86)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/03/25 11:12:49 ERROR TaskSetManager: Task 0 in stage 19.0 failed 4 times; aborting job
20/03/25 11:12:49 WARN TaskSetManager: Lost task 11.2 in stage 19.0 (TID 210, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 4.3 in stage 19.0 (TID 204, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 7.1 in stage 19.0 (TID 201, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 2.2 in stage 19.0 (TID 205, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 12.2 in stage 19.0 (TID 206, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 10.2 in stage 19.0 (TID 207, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 3.2 in stage 19.0 (TID 209, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 13.0 in stage 19.0 (TID 208, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 8.3 in stage 19.0 (TID 202, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 4 times, most recent failure: Lost task 0.3 in stage 19.0 (TID 203, dbslp1102.uhc.com, executor 5): java.lang.NullPointerException
at $anonfun$1.apply(<console>:87)
at $anonfun$1.apply(<console>:86)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2341)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2341)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2341)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2828)
at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2340)
... 86 elided
Caused by: java.lang.NullPointerException
at $anonfun$1.apply(<console>:87)
at $anonfun$1.apply(<console>:86)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)