Вы можете передать всю строку в качестве дополнительного аргумента с помощью struct("*")
или struct(*)
в SQL. Пример:
val df = Seq(
(1, Option.empty[String], 20)
).toDF("id", "name", "age")
val myUDF = udf((name: String, row: Row) =>
try {
Some(name.toLowerCase())
} catch {
case e: Exception => println(row.mkString(","))
None
}
)
df
.select(myUDF($"name",struct("*")))
.show()
затем вы увидите содержимое строки (в данном случае 1,null,20
) в журналах. Так как журналы находятся на удаленных машинах, это может разочаровать.
Подробнее об отладке / обработке исключений: Вы можете распространить исключение на драйвер, используя повторное генерирование исключения со строкой -представление строки как сообщения. Обратите внимание, что ваша работа не будет выполнена, если возникнет исключение:
val myUDF = udf((name: String, row: Row) =>
try {
name.toLowerCase()
} catch {
case e: Exception => throw new Exception("row : "+row.mkString(","),e)
}
)
Мое предпочтительное решение - это вернуть дополнительный столбец из UDF, содержащий сообщение об ошибке, это также не остановит искру работа в случае ошибки:
val myUDF = udf((name: String) => {
val result: (Option[String], Option[String]) = try {
(Option(name.toLowerCase()), None)
} catch {
case e: java.lang.Exception => (None, Option(e.toString()))
}
result
}
)
df
.withColumn("tmp",myUDF($"name"))
.withColumn("udf_result",$"tmp._1")
.withColumn("error",$"tmp._2").drop($"tmp")
.show(false)
+---+----+---+----------+------------------------------+
|id |name|age|udf_result|error |
+---+----+---+----------+------------------------------+
|1 |null|20 |null |java.lang.NullPointerException|
+---+----+---+----------+------------------------------+
вот так, нет необходимости передавать всю строку в udf, вы можете просто отфильтровать вашу df для df.where($"error".isNotNull)