Как создать и сохранить ссылочный объект на каждого работника в UDF Spark 2.3.0? - PullRequest
0 голосов
/ 07 июня 2018

В задании структурированной потоковой передачи Spark 2.3.0 мне нужно добавить столбец в DataFrame, который получен из значения той же строки существующего столбца.

Я хочу определить это преобразование в UDF и используйте withColumn для создания нового DataFrame.

Для выполнения этого преобразования требуется обращение к очень дорогостоящему для создания ссылочному объекту - создание его один раз для каждой записидает неприемлемую производительность.

Каков наилучший способ создания и сохранения этого объекта один раз для каждого рабочего узла, чтобы на него можно было повторно ссылаться для каждой записи в каждом пакете?Обратите внимание, что объект не сериализуем.

Мои текущие попытки вращаются вокруг подкласса UserDefinedFunction , чтобы добавить дорогой объект в качестве ленивого члена и предоставить альтернативный конструктор для этого подкласса, который обычно выполняет initвыполняется функцией udf , но я до сих пор не смог заставить ее выполнить тот тип приведения типов, который делает udf - некоторый глубокий вывод типа требует объекты типа org.apache.spark.sql.Column, когдамоя трансформация лямбда работает со строкой для ввода и вывода.

Примерно так:

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DataType

class ExpensiveReference{
  def ExpensiveReference() = ... // Very slow
  def transformString(in:String) = ... // Fast
}

class PersistentValUDF(f: AnyRef, dataType: DataType, inputTypes: Option[Seq[DataType]]) extends UserDefinedFunction(f: AnyRef, dataType: DataType, inputTypes: Option[Seq[DataType]]){  
  lazy val ExpensiveReference = new ExpensiveReference()
  def PersistentValUDF(){
    this(((in:String) => ExpensiveReference.transformString(in) ):(String => String), StringType, Some(List(StringType)))
  }
}

Чем дальше я копаю в этой кроличьей норе, тем больше подозреваю, что есть лучший способ сделать эточто я с видомОтсюда и этот пост.

Редактировать: я тестировал ленивую инициализацию ссылки в объекте, объявленном в UDF;это вызывает повторную инициализацию.Пример кода и объекта

class IntBox {
  var valu = 0;
  def increment {
    valu = valu + 1
  }
  def get:Int ={
    return valu
  }
}


val altUDF = udf((input:String) => {
  object ExpensiveRef{
     lazy val box = new IntBox
     def transform(in:String):String={
       box.increment
       return in + box.get.toString
     }
  }
  ExpensiveRef.transform(input)
})

Приведенный выше UDF всегда добавляет 1;поэтому ленивый объект переинициализируется для каждой записи.

Ответы [ 2 ]

0 голосов
/ 12 июня 2018

Я нашел этот пост , чей вариант 1 я смог превратить в работоспособное решение.Конечный результат оказался похож на ответ Яцека Ласковского, но с несколькими хитростями:

  1. Вытяните определение объекта за пределы области действия UDF.Даже будучи ленивым, он все равно будет повторно инициализирован, если определен в области действия UDF.
  2. Переместите функцию преобразования из объекта в лямбду UDF (необходимо, чтобы избежать ошибок сериализации)
  3. Захватите ленивый член объекта при закрытии лямбда UDF

Примерно так:

object ExpensiveReference {
  lazy val ref = ...
  }
val persistentUDF = udf((input:String)=>{
  /*transform code that references ExpensiveReference.ref*/
})
0 голосов
/ 07 июня 2018

ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ Позвольте мне остановиться на этом, но, пожалуйста, сочтите, что это незавершенная работа (отрицательных голосов нет, нет:))

Что бы я сделал, чтобыиспользуйте объект Scala с lazy val для дорогой ссылки.

object ExpensiveReference {
  lazy val ref = ???
  def transform(in:String) = {
    // use ref here
  }
}

С этим объектом все, что вы делаете с исполнителем Spark (будь то часть UDF или любого другого вычисления), собирается создать экземплярExpensiveReference.ref при самом первом доступе.Вы можете получить к нему доступ напрямую или к части transform.

Опять же, на самом деле не имеет значения, делаете ли вы это в UDF или UDAF или в любом другом преобразовании.Дело в том, что как только вычисления выполняются на Spark executor «очень дорогой в создании эталонный объект - создание его один раз для каждой записи приводит к неприемлемой производительности». произойдет только один раз.

Это может быть в UDF (только чтобы прояснить).

...