Scala Upserts с использованием jdb c подготовленного состояния и foreachPartition - PullRequest
0 голосов
/ 23 марта 2020

Я новичок в scala / java программирование

Мне нужно создать шаблон scala jdbcTemplate для сопоставления нескольких столбцов с запросом SQL для базы данных PostgreSQL.

Мой запрос вставки содержит около 80 столбцов.

примерно так:

INSERT into schema.table_one (cov_eff_dt,cov_canc_dt,cov_pd_thru_dt,ebill_dt,retro_elig_recv_dt,retro_orig_cov_eff_dt,retro_orig_cov_canc_dt,cobra_eff_dt,elig_grc_prd_thru_dt,lst_prem_pd_dt,pol_ren_dt,partn_nbr,xref_id_partn_nbr,cnsm_id,prfl_id,src_cdb_xref_id,cos_pnl_nbr,src_tmstmp,row_tmstmp,created_dttm,updt_dttm,src_cd,lgcy_pol_nbr,lgcy_src_id,cov_typ_cd,cos_div_cd,mkt_typ_cd,cos_grp_nbr,lgcy_prdt_typ_cd,lgcy_prdt_cd,cov_lvl_typ_cd,shr_arng_cd,shr_arng_oblig_cd,lgcy_pln_var_cd,lgcy_rpt_cd,prdt_srvc_typ_cd,ee_sts_typ_cd,govt_pgm_typ_cd,clm_sys_typ_cd,elig_sys_typ_cd,ces_grp_nbr,mkt_site_cd,row_sts_cd,medica_trvlben_ind,row_user_id,sec_typ_cd,cancel_rsn_typ_cd,cov_pd_thru_rsn_cd,list_bill_typ_cd,billing_sufx_cd,billing_subgrp_nbr,retro_days,retro_typ_cd,retro_ovrd_typ_cd,tops_cov_lvl_typ_cd,lgcy_ben_pln_id,lgcy_prdt_id,rr_ben_grp_nbr,rr_ben_grp_cho_cd,rr_br_cd,rr_un_cd,rr_optout_plan_ind,updt_typ_cd,racf_id,prr_cov_mo,fund_typ_cd,state_of_issue_cd,cobra_mo,cobra_qual_evnt_cd,grndfathered_pol_ind,deriv_cov_ind,cnsm_lgl_enty_nm,indv_grp_typ_cd,src_cov_mnt_typ_cd,pbp_cd,h_cntrct_id,risk_typ_cd,bil_typ_cd,rate_cov_typ_cd,plan_cd,seg_id,src_sys_id) VALUES ( ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT (cov_eff_dt,xref_id_partn_nbr,src_cdb_xref_id,src_cd,lgcy_pol_nbr,lgcy_src_id,cov_typ_cd) 
DO  UPDATE SET cov_canc_dt= ?,cov_pd_thru_dt= ?,ebill_dt= ?,retro_elig_recv_dt= ?,retro_orig_cov_eff_dt= ?,retro_orig_cov_canc_dt= ?,cobra_eff_dt= ?,elig_grc_prd_thru_dt= ?,lst_prem_pd_dt= ?,pol_ren_dt= ?,partn_nbr= ?,prfl_id= ?,cnsm_id= ?,cos_pnl_nbr= ?,src_tmstmp= ?,row_tmstmp= ?,updt_dttm= ?,cos_div_cd= ?,mkt_typ_cd= ?,cos_grp_nbr= ?,lgcy_prdt_typ_cd= ?,lgcy_prdt_cd= ?,cov_lvl_typ_cd= ?,shr_arng_cd= ?,shr_arng_oblig_cd= ?,lgcy_pln_var_cd= ?,lgcy_rpt_cd= ?,prdt_srvc_typ_cd= ?,ee_sts_typ_cd= ?,govt_pgm_typ_cd= ?,clm_sys_typ_cd= ?,elig_sys_typ_cd= ?,ces_grp_nbr= ?,mkt_site_cd= ?,row_sts_cd= ?,medica_trvlben_ind= ?,row_user_id= ?,sec_typ_cd= ?,cancel_rsn_typ_cd= ?,cov_pd_thru_rsn_cd= ?,list_bill_typ_cd= ?,billing_sufx_cd= ?,billing_subgrp_nbr= ?,retro_days= ?,retro_typ_cd= ?,retro_ovrd_typ_cd= ?,tops_cov_lvl_typ_cd= ?,lgcy_ben_pln_id= ?,lgcy_prdt_id= ?,rr_ben_grp_nbr= ?,rr_ben_grp_cho_cd= ?,rr_br_cd= ?,rr_un_cd= ?,rr_optout_plan_ind= ?,updt_typ_cd= ?,racf_id= ?,prr_cov_mo= ?,fund_typ_cd= ?,state_of_issue_cd= ?,cobra_mo= ?,cobra_qual_evnt_cd= ?,grndfathered_pol_ind= ?,deriv_cov_ind= ?,cnsm_lgl_enty_nm= ?,indv_grp_typ_cd= ?,src_cov_mnt_typ_cd= ?,pbp_cd= ?,h_cntrct_id= ?,risk_typ_cd= ?,bil_typ_cd= ?,rate_cov_typ_cd= ?,plan_cd= ?,seg_id= ?,src_sys_id= ?

Данные, которые необходимо поместить в "?" заполнитель хранится в другом фрейме данных с именем inputdatafiledfwindow.

Отображение для столбцов, т. е. функции для установки значений в подготовленном выражении, должно генерироваться динамически.

          val inputdatafiledfwindow = inputdatafiledf.select("*").withColumn("rank",row_number().over(windowSpec)).where("rank = 1").drop("rank")

inputdatafiledfwindow.coalesce(10).foreachPartition(partition => {
      val dbc: Connection = DriverManager.getConnection(jdbcConnectionString, user.toString, password.toString)
      val st = dbc.prepareStatement(updatequeryforInsertAndUpdate)
      partition.grouped(50).foreach(batch => {
        batch.foreach { row => {
                st.setShort(1, row.getShort(0))
                st.setInt(2, row.getInt(1))
                st.setString(3, row.getString(2).replaceAll("\\000", ""))
                st.setString(4, row.getString(3).replaceAll("\\000", ""))
                st.setString(5, row.getString(4).replaceAll("\\000", ""))
                st.setString(6, row.getString(5).replaceAll("\\000", ""))
                st.setDate(7, row.getDate(6))
                st.setDate(8, row.getDate(7))
                st.setString(9, row.getString(8).replaceAll("\\000", ""))
                st.setString(10, row.getString(9).replaceAll("\\000", ""))
                st.setString(11, row.getString(10).replaceAll("\\000", ""))
                st.setString(12, row.getString(11).replaceAll("\\000", ""))
                st.setString(13, row.getString(12).replaceAll("\\000", ""))
                st.setString(14, row.getString(13).replaceAll("\\000", ""))
                st.setString(15, row.getString(14).replaceAll("\\000", ""))
                st.setString(16, row.getString(15).replaceAll("\\000", ""))
                st.setString(17, row.getString(16).replaceAll("\\000", ""))
                st.setString(18, row.getString(17).replaceAll("\\000", ""))
                st.setString(19, row.getString(18).replaceAll("\\000", ""))
                st.setString(20, row.getString(19).replaceAll("\\000", ""))
                st.setString(21, row.getString(20).replaceAll("\\000", ""))
                st.setString(22, row.getString(21).replaceAll("\\000", ""))
                st.setString(23, row.getString(22).replaceAll("\\000", ""))
                st.setString(24, row.getString(23).replaceAll("\\000", ""))
                st.setString(25, row.getString(24).replaceAll("\\000", ""))
                st.setString(26, row.getString(25).replaceAll("\\000", ""))
          }

          st.addBatch()
        }
        }
        st.executeBatch()
      })
      dbc.close()
    })

В настоящее время я ' Я пытаюсь что-то вроде этого:

val columnNames: scala.Array[String] = inputdatafiledfwindow.columns
    val columnDataTypes: scala.Array[String] = inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString)

inputdatafiledfwindow.coalesce(10).foreachPartition(partition  => {
      val columnNames_br = sc.broadcast(inputdatafiledfwindow.columns)
      val columnDataTypes_br = sc.broadcast(inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString))
      val dbc: Connection = DriverManager.getConnection(jdbcConnectionString, user.toString, password.toString)
      val st  = dbc.prepareStatement(updatequeryforInsertAndUpdate)
partition.grouped(50).foreach(batch => {
        batch.foreach { row => {
          for (i<-0 to columnNames.length-1) {
            if (columnDataTypes(i) == "ShortType")
              st.setShort((i+1).toInt, row.getShort(i))
            else if(columnDataTypes(i)== "IntegerType")
              st.setInt((i+1).toInt,row.getInt(i))
            else if (columnDataTypes(i)=="StringType")
              st.setString((i+1).toInt,row.getString(i))
            else if(columnDataTypes(i)=="TimestampType")
              st.setTimestamp((i+1).toInt, row.getTimestamp(i))
            else if(columnDataTypes(i)=="DateType")
              st.setDate((i+1).toInt,row.getDate(i))
            else if (columnDataTypes(i)=="DoubleType")
              st.setDouble((i+1).toInt, row.getDouble(i))
          }
          st.addBatch()
        }
        }
        st.executeBatch()
      })
      dbc.close()
    })

И это дает мне: org. apache .spark.SparkException: Ошибка задачи не сериализуема

Любые идеи или ресурсы, которые я могу использовать для реализации это. Я знаю, что это возможно в java, но я не слишком много работал в java, ни в Scala.

Редактирование: пробовал использовать braodcast переменную внутри foreachPartition Все еще получая org.apache.spark.SparkException: Task not serializable

Ниже приведен полный стек исключений:


org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2343)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:957)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:956)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:956)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2735)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2735)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2735)
  at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId$1.apply(Dataset.scala:3349)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3345)
  at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2734)
  ... 80 elided
Caused by: java.io.NotSerializableException: org.postgresql.jdbc.PgConnection
Serialization stack:
        - object not serializable (class: org.postgresql.jdbc.PgConnection, value: org.postgresql.jdbc.PgConnection@71c7a55b)
        - field (class: $iw, name: dbc, type: interface java.sql.Connection)
        - object (class $iw, $iw@22459ca5)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@788dd40c)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@31c725ed)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@4a367987)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@7cffd7ab)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@3c615880)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@289fa6c2)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@2a5a0934)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@4a04a12a)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@c5fe90a)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@58b67f02)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@243a4a22)
        - field (class: $line77.$read, name: $iw, type: class $iw)
        - object (class $line77.$read, $line77.$read@5f473976)
        - field (class: $iw, name: $line77$read, type: class $line77.$read)
        - object (class $iw, $iw@70fc6803)
        - field (class: $iw, name: $outer, type: class $iw)
        - object (class $iw, $iw@26818b0)
        - field (class: $anonfun$1, name: $outer, type: class $iw)
        - object (class $anonfun$1, <function1>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
  ... 98 more

Обновление 2: внесены изменения, предложенные @RamGhadiyaram, но теперь они столкнулись с новой исключительной ситуацией NullPointerException. Я не понимаю, где я иду не так. Конечно, это должно быть простое решение.

Ниже приведен обновленный код:

val inputdatafiledfwindow = inputdatafiledf.select("*").withColumn("rank",row_number().over(windowSpec)).where("rank = 1").drop("rank")

    val updatequeryforInsertAndUpdate = "INSERT INTO " + schema + table_name + updatequery + " where " + schema + table_name + s".row_tmstmp < '2020-02-17 00:00:00' OR ${table_name}.row_tmstmp < ?"

    val columnNames: scala.Array[String] = inputdatafiledfwindow.columns
    val columnDataTypes: scala.Array[String] = inputdatafiledfwindow.schema.fields.map(x=>x.dataType).map(x=>x.toString)


    inputdatafiledfwindow.foreachPartition(partition  => {
      val columnNames_br = sc.broadcast(columnNames)
      val columnDataTypes_br = sc.broadcast(columnDataTypes)

      val dbc: Connection = DriverManager.getConnection(jdbcConnectionString)
      val st  = dbc.prepareStatement(updatequeryforInsertAndUpdate)
      partition.grouped(50).foreach(batch => {
        batch.foreach { row => {
          for (i<-0 to columnNames_br.value.length-1) {
            if (columnDataTypes_br.value(i) == "ShortType")
              st.setShort((i+1), row.getShort(i))
            else if(columnDataTypes_br.value(i)== "IntegerType")
              st.setInt((i+1),row.getInt(i))
            else if (columnDataTypes_br.value(i)=="StringType")
              st.setString((i+1),row.getString(i))
            else if(columnDataTypes_br.value(i)=="TimestampType")
              st.setTimestamp((i+1), row.getTimestamp(i))
            else if(columnDataTypes_br.value(i)=="DateType")
              st.setDate((i+1),row.getDate(i))
            else if (columnDataTypes_br.value(i)=="DoubleType")
              st.setDouble((i+1), row.getDouble(i))
          }
          st.addBatch()
        }
        }
        st.executeBatch()
      })
      dbc.close()
    })

и ниже новый стек исключений:

20/03/25 11:12:49 WARN TaskSetManager: Lost task 0.0 in stage 19.0 (TID 176, dbslp1102.uhc.com, executor 4): java.lang.NullPointerException
        at $line89.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:87)
        at $line89.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:86)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

20/03/25 11:12:49 ERROR TaskSetManager: Task 0 in stage 19.0 failed 4 times; aborting job
20/03/25 11:12:49 WARN TaskSetManager: Lost task 11.2 in stage 19.0 (TID 210, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 4.3 in stage 19.0 (TID 204, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 7.1 in stage 19.0 (TID 201, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 2.2 in stage 19.0 (TID 205, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 12.2 in stage 19.0 (TID 206, dbslp1102.uhc.com, executor 2): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 10.2 in stage 19.0 (TID 207, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 3.2 in stage 19.0 (TID 209, dbslp1102.uhc.com, executor 5): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 13.0 in stage 19.0 (TID 208, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
20/03/25 11:12:49 WARN TaskSetManager: Lost task 8.3 in stage 19.0 (TID 202, dbslp1102.uhc.com, executor 4): TaskKilled (stage cancelled)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 4 times, most recent failure: Lost task 0.3 in stage 19.0 (TID 203, dbslp1102.uhc.com, executor 5): java.lang.NullPointerException
        at $anonfun$1.apply(<console>:87)
        at $anonfun$1.apply(<console>:86)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2341)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2341)
  at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2341)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2828)
  at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2340)
  ... 86 elided
Caused by: java.lang.NullPointerException
  at $anonfun$1.apply(<console>:87)
  at $anonfun$1.apply(<console>:86)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
  at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:108)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

1 Ответ

1 голос
/ 23 марта 2020

Как я понял, вам нужно динамически сгенерировать все столбцы и вставить в соответствующий столбец данных с правильным типом данных.

для этого для существующего фрейма данных

    val columnNames : Array[String] = inputdatafiledfwindow.columns

    val columnDataTypes : Array[String] = inputdatafiledfwindow.schema.fields
                            .map(x=>x.dataType)
                            .map(x=>x.toString)

Теперь вы получил столбцы и соответствующие им типы данных.

вы реализуете в своем l oop, динамически проверяя типы данных и вызывая соответствующий метод psmt.setxxx для подготовленного заявления. а индекс массива - это индекс параметра для setXXX.

В этом случае шаблон пружины jdb c не требуется, этого можно добиться и с помощью jdb c.

UPDATE1 :


Ваши типы столбцов, например, columnDataTypes, и массив имен столбцов, например, columnNames, должны транслироваться с использованием широковещательной переменной, чтобы использовать их в части foreachpartition, которая может быть root причина ошибки .. org.apache.spark.SparkException: Task not serializable error


ОБНОВЛЕНИЕ 2: Причина: java .io.NotSerializableException: org. postgresql .jdb c .PgПодключение где-нибудь, где ваш проблема с подключением

AFAIK, пользователь и пароль, которые вы конвертируете как строку, которая может быть из другого объекта ... но обычно это выглядит нормально с вашим кодом. перепроверьте это.

В соответствии с документами Spark вы можете также объявить URL вместе с пользователем и паролем, как это. попробуйте.

JDB C К другим базам данных - URL JDB C для подключения. Специфичные для источника c свойства соединения могут быть указаны в URL. Например, jdbc:postgresql://localhost/test?user=fred&password=secret

ОБНОВЛЕНИЕ 3: Причина исключения нулевого указателя является прямой. если вы работаете с нулевым значением, это приведет к исключению нулевого указателя

Например: row.getString(8).replaceAll("\\000", "") если row.getString(8) равно нулю и вы применяете replaceAll, то это исключение нулевого указателя. Вы должны проверить, не является ли row.getString(8) ненулевым, а затем применить функцию replaceAll ..

. Лучший способ избежать нулевых указателей - использовать опцию scala.

Еще одним наблюдением является использование scala для каждого l oop вместо java традиционного l oop.

ПРИМЕЧАНИЕ: ПОЖАЛУЙСТА, ЗАДАТЬ ОТДЕЛЬНЫЙ ВОПРОС ДЛЯ КАЖДОГО ТРЕБОВАНИЯ. НЕ СМЕШАЙТЕ НЕПРАВИЛЬНО. ОНО ОЧЕНЬ ПОМОЖЕТ ДРУГИМ

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...