В задании структурированной потоковой передачи Spark 2.3.0 мне нужно добавить столбец в DataFrame, который получен из значения той же строки существующего столбца.
Я хочу определить это преобразование в UDF и используйте withColumn для создания нового DataFrame.
Для выполнения этого преобразования требуется обращение к очень дорогостоящему для создания ссылочному объекту - создание его один раз для каждой записидает неприемлемую производительность.
Каков наилучший способ создания и сохранения этого объекта один раз для каждого рабочего узла, чтобы на него можно было повторно ссылаться для каждой записи в каждом пакете?Обратите внимание, что объект не сериализуем.
Мои текущие попытки вращаются вокруг подкласса UserDefinedFunction , чтобы добавить дорогой объект в качестве ленивого члена и предоставить альтернативный конструктор для этого подкласса, который обычно выполняет initвыполняется функцией udf , но я до сих пор не смог заставить ее выполнить тот тип приведения типов, который делает udf
- некоторый глубокий вывод типа требует объекты типа org.apache.spark.sql.Column
, когдамоя трансформация лямбда работает со строкой для ввода и вывода.
Примерно так:
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DataType
class ExpensiveReference{
def ExpensiveReference() = ... // Very slow
def transformString(in:String) = ... // Fast
}
class PersistentValUDF(f: AnyRef, dataType: DataType, inputTypes: Option[Seq[DataType]]) extends UserDefinedFunction(f: AnyRef, dataType: DataType, inputTypes: Option[Seq[DataType]]){
lazy val ExpensiveReference = new ExpensiveReference()
def PersistentValUDF(){
this(((in:String) => ExpensiveReference.transformString(in) ):(String => String), StringType, Some(List(StringType)))
}
}
Чем дальше я копаю в этой кроличьей норе, тем больше подозреваю, что есть лучший способ сделать эточто я с видомОтсюда и этот пост.
Редактировать: я тестировал ленивую инициализацию ссылки в объекте, объявленном в UDF;это вызывает повторную инициализацию.Пример кода и объекта
class IntBox {
var valu = 0;
def increment {
valu = valu + 1
}
def get:Int ={
return valu
}
}
val altUDF = udf((input:String) => {
object ExpensiveRef{
lazy val box = new IntBox
def transform(in:String):String={
box.increment
return in + box.get.toString
}
}
ExpensiveRef.transform(input)
})
Приведенный выше UDF всегда добавляет 1;поэтому ленивый объект переинициализируется для каждой записи.