Spark SQL (язык, не API) и доступ к данным строки из UDF - PullRequest
2 голосов
/ 25 февраля 2020

Я использую собственные функции Spark UDF в выражениях Spark SQL (язык SQL) (не через Spark API). В случае сбоя внутри моей функции UDF я бы хотел получить доступ ко всей строке со всеми столбцами и предоставить эту информацию (например, через пользовательские исключения или журналы) для лучшей обработки ошибок.

Прямо сейчас я не Не знаю, как получить доступ к столбцам строк внутри моего UDF или даже как передать все столбцы в мой UDF через SQL. Пожалуйста, предложите.

1 Ответ

2 голосов
/ 25 февраля 2020

Вы можете передать всю строку в качестве дополнительного аргумента с помощью struct("*") или struct(*) в SQL. Пример:

val df = Seq(
  (1, Option.empty[String], 20)
).toDF("id", "name", "age")

val myUDF = udf((name: String, row: Row) =>
  try {
    Some(name.toLowerCase())
  } catch {
    case e: Exception => println(row.mkString(","))
      None
  }
)

df
  .select(myUDF($"name",struct("*")))
  .show()

затем вы увидите содержимое строки (в данном случае 1,null,20) в журналах. Так как журналы находятся на удаленных машинах, это может разочаровать.

Подробнее об отладке / обработке исключений: Вы можете распространить исключение на драйвер, используя повторное генерирование исключения со строкой -представление строки как сообщения. Обратите внимание, что ваша работа не будет выполнена, если возникнет исключение:

val myUDF = udf((name: String, row: Row) =>
  try {
    name.toLowerCase()
  } catch {
    case e: Exception => throw new Exception("row : "+row.mkString(","),e)
  }
)

Мое предпочтительное решение - это вернуть дополнительный столбец из UDF, содержащий сообщение об ошибке, это также не остановит искру работа в случае ошибки:

val myUDF = udf((name: String) => {
  val result: (Option[String], Option[String]) = try {
    (Option(name.toLowerCase()), None)
  } catch {
    case e: java.lang.Exception  => (None, Option(e.toString()))
  }
  result
}
)

df
  .withColumn("tmp",myUDF($"name"))
  .withColumn("udf_result",$"tmp._1")
  .withColumn("error",$"tmp._2").drop($"tmp")
  .show(false) 

+---+----+---+----------+------------------------------+
|id |name|age|udf_result|error                         |
+---+----+---+----------+------------------------------+
|1  |null|20 |null      |java.lang.NullPointerException|
+---+----+---+----------+------------------------------+

вот так, нет необходимости передавать всю строку в udf, вы можете просто отфильтровать вашу df для df.where($"error".isNotNull)

...