Я пытаюсь написать собственный oracle писатель, чтобы можно было использовать пул соединений. Этот код отлично работает при запуске на моем локальном компьютере (один узел), но не работает с ошибкой ниже в кластере с несколькими узлами, и я не могу понять, почему?
Ошибка
Caused by: java.lang.NullPointerException
at com.apple.lifeisgood.starters.OracleStreamingStarterWriter10$$anonfun$1.apply(OracleStreamingStarter10.scala:37)
at com.apple.lifeisgood.starters.OracleStreamingStarterWriter10$$anonfun$1.apply(OracleStreamingStarter10.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
... 3 more"
Код
object CustomOracleWriter extends App {
val logger = Logger.getLogger("Server")
var ratings_path = "***"
val spark = SparkSession
.builder()
.appName("HomeAlone")
.master("local[4]")
.getOrCreate()
val ratingDF: DataFrame = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("sep", ";")
.load(ratings_path)
.repartition(4)
.cache()
logger.info(ratingDF.count())
ratingDF.foreachPartition { part =>
val connection = DatasourcePool.connectionPool.getConnection
logger.info("PartitionID:" + TaskContext.getPartitionId() + "connectionID:" + connection)
val pre_sql_query = "INSERT INTO MY_SCHEMA.MY_TABLE (user_id,movie_id,rating,rated_at) VALUES(?,?,?,?)"
var inc = 0
val insertStatement: PreparedStatement = connection.prepareStatement(pre_sql_query)
part.foreach { row =>
insertStatement.setString(1, row.get(0).toString)
insertStatement.setString(2, row.get(1).toString)
insertStatement.setString(3, row.get(2).toString)
insertStatement.setString(4, row.get(3).toString)
insertStatement.addBatch()
inc += 1
if (inc == 10000) { //batching for 10k
logger.info("Executing batch")
insertStatement.executeBatch()
inc = 0
}
}
logger.info("Executing final batch")
insertStatement.executeBatch()
logger.info("Executed final batch")
connection.close()
}
}
object DatasourcePool{
val connectionPool = new BasicDataSource()
connectionPool.setDriverClassName("oracle.jdbc.driver.OracleDriver")
connectionPool.setUsername("***")
connectionPool.setPassword("****")
connectionPool.setUrl("***")
connectionPool.setInitialSize(3)
}
Строка 35 в приведенном выше коде ниже
logger.info("PartitionID:" + TaskContext.getPartitionId() + "connectionID:" + connection)