Задание Spark выдает исключение нулевого указателя при работе в кластере - PullRequest
0 голосов
/ 07 мая 2020

Я пытаюсь написать собственный oracle писатель, чтобы можно было использовать пул соединений. Этот код отлично работает при запуске на моем локальном компьютере (один узел), но не работает с ошибкой ниже в кластере с несколькими узлами, и я не могу понять, почему?

Ошибка

Caused by: java.lang.NullPointerException
    at com.apple.lifeisgood.starters.OracleStreamingStarterWriter10$$anonfun$1.apply(OracleStreamingStarter10.scala:37)
    at com.apple.lifeisgood.starters.OracleStreamingStarterWriter10$$anonfun$1.apply(OracleStreamingStarter10.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    ... 3 more"

Код

object CustomOracleWriter extends App {
  val logger = Logger.getLogger("Server")
  var ratings_path = "***"

  val spark = SparkSession
    .builder()
    .appName("HomeAlone")
    .master("local[4]")
    .getOrCreate()

  val ratingDF: DataFrame = spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("sep", ";")
    .load(ratings_path)
    .repartition(4)
    .cache()
  logger.info(ratingDF.count())

  ratingDF.foreachPartition { part =>
    val connection = DatasourcePool.connectionPool.getConnection
    logger.info("PartitionID:" + TaskContext.getPartitionId() + "connectionID:" + connection)
    val pre_sql_query = "INSERT INTO MY_SCHEMA.MY_TABLE (user_id,movie_id,rating,rated_at) VALUES(?,?,?,?)"
    var inc = 0
    val insertStatement: PreparedStatement = connection.prepareStatement(pre_sql_query)
    part.foreach { row =>
      insertStatement.setString(1, row.get(0).toString)
      insertStatement.setString(2, row.get(1).toString)
      insertStatement.setString(3, row.get(2).toString)
      insertStatement.setString(4, row.get(3).toString)
      insertStatement.addBatch()
      inc += 1
      if (inc == 10000) { //batching for 10k
        logger.info("Executing batch")
        insertStatement.executeBatch()
        inc = 0
      }
    }
    logger.info("Executing final batch")
    insertStatement.executeBatch()
    logger.info("Executed final batch")
    connection.close()
  }

}

object DatasourcePool{
  val connectionPool = new BasicDataSource()
  connectionPool.setDriverClassName("oracle.jdbc.driver.OracleDriver")
  connectionPool.setUsername("***")
  connectionPool.setPassword("****")
  connectionPool.setUrl("***")
  connectionPool.setInitialSize(3)
}

Строка 35 в приведенном выше коде ниже

logger.info("PartitionID:" + TaskContext.getPartitionId() + "connectionID:" + connection)
...