Я запускаю искровое задание, в котором я использую jdb c для получения таблицы из источника данных (mysql) и записи в другую таблицу. Моя цель - проверить, является ли выполненная операция недействительной (в данном случае Insert). Если он недопустим, то исключение перехватывается и выводится на консоль вместе с запросом sql, вызвавшим ошибку. Я не знаю, как получить запрос, поскольку сам не указывал его.
Я использую автономный Spark.
Вот фрагмент кода:
override def function(parameter: util.List[(MutationType, Dataset[Row])]): Unit = {
val properties = new Properties()
properties.put("user", username)
properties.put("password", password)
properties.put("driver", driver)
parameter.asScala.foreach(par => {
val mutationType = par._1
val mutation = par._2
mutationType match {
case MutationType.INSERT =>
try{mutation.write.mode(SaveMode.Append).jdbc(url, tableName, properties)}
catch{
case insert: Exception => {
Logger.getLogger(this.getClass()).error("Error: "+ tableName + url) // I need to Log the SQL query here
throw insert}
}
case _ => throw new RuntimeException("JDBC output does not support mutation type: " + mutationType)
}
})
}