Spark обрабатывает UDF, оборачивая их внутри класса.Например, когда вы пишете следующее:
val example = udf((a: Int) => a * 2)
Функция udf
создает класс UserDefinedFunction
, который в своей функции apply создает расширение ScalaUDF.
ScalaUDF Expression и в своем методе doCodeGenона делает следующее:
...
val callFunc =
s"""
|$boxedType $resultTerm = null;
|try {
| $resultTerm = ($boxedType)$resultConverter.apply($getFuncResult);
|} catch (Exception e) {
| throw new org.apache.spark.SparkException($errorMsgTerm, e);
|}
""".stripMargin
ev.copy(code =
code"""
|$evalCode
|${initArgs.mkString("\n")}
|$callFunc
...
Эта функция преобразует DataType
столбца / выражения в тип Scala (потому что ваш UDF работает с типами scala), а затем вызывает вашу лямбду.deterministic,
nullable,
и dataTypes
являются функциями оболочки пользовательской функции, потому что она расширяет Expression, а не вашу функцию.Если вы хотите в полной мере использовать их, вам нужно написать собственное выражение, которое расширяет Expression
или один из его подклассов.
В качестве примера рассмотрим следующее:
val redundantUdf = udf((a: Long) => true)
someDf.filter(redundantUdf(someDf("col1"))).explain()
Оптимизированный логический план будет выглядеть примерно так:
Project [_1#5736 AS Type#5739, _2#5737L AS sts#5740L]
+- Filter UDF(_2#5737L)
+- LocalRelation [_1#5736, _2#5737L]
Как вы можете видеть, он выполняет фильтр, даже если он избыточен и всегда будет иметь значение true.
Принимая во внимание, чтоследующее:
someDf.filter(expr("true")).explain()
даст следующий оптимизированный логический план:
LocalRelation [Type#5739, sts#5740L]
Он удаляет фильтр, используя правило PruneFilter.
Это не означает, что все оптимизацииисключаются, есть оптимизации, которые все еще работают с пользовательскими функциями, такими как CombineFilter
, который комбинирует выражение из двух фильтров, например:
== Analyzed Logical Plan ==
_1: string, _2: string
Filter UDF(_1#2)
+- Filter UDF(_1#2)
+- LocalRelation [_1#2, _2#3]
== Optimized Logical Plan ==
Filter (UDF(_1#2) && UDF(_1#2))
+- LocalRelation [_1#2, _2#3]
Эта оптимизация работает, потому что она зависит только от поля deterministic
иUDF по умолчанию являются детерминированными.Таким образом, UDF получат выгоду от простых оптимизаций, которые не зависят от функции, которую он оборачивает.Это потому, что он находится в формате, который катализатор не понимает, катализатор работает на деревьях, а ваше закрытие - функция Scala.Существуют и другие места, где UDF теряют свои значения, например, указание сгенерированного кода Java и информации о типе искры.