как сделать рефлекМетод как UDF - PullRequest
0 голосов
/ 21 ноября 2018

есть проблема для меня :( Ко мне люди помогают:)

это мое требование: динамическая загрузка jars + регистрация UDF.

код:

 //Test.ImTest
 object ImTest extends Serializable {
    def len(bookTitle: String):String =
          {"ImTest"}
      }

    // main 
    val ru = scala.reflect.runtime.universe
    val classMirror = ru.runtimeMirror(getClass.getClassLoader)
    val classTest = classMirror.staticModule("Test.ImTest")
    val methods = classMirror.reflectModule(classTest)
    val objMirror = classMirror.reflect(methods.instance)

    val method = methods.symbol.typeSignature.member(ru.TermName("len")).asMethod

    val result = objMirror.reflectMethod(method)("bbb")

    def d(s: String) = {
      objMirror.reflectMethod(method)(s)
    }
    spark.udf.register("len", d _)
    spark.sql("select len('bb')").show()

ошибки:

18/11/21 09:58:52 INFO execution.SparkSqlParser: Parsing command: select len('bb')
18/11/21 09:58:54 INFO codegen.CodeGenerator: Code generated in 475.734954 ms
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2106)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:839)

Причина: java.io.NotSerializableException: scala.reflect.runtime.SynchronizedSymbols $ SynchronizedSymbol $$ anon $ 9 Стек сериализации: - объект не сериализуем (класс: scala.reflect.runtime.SynchronizedSymbols $ SynchronizedSymbol $$ anon $ 9, значение: метод len) - поле (класс: Test.Main $$ anonfun $ main $ 1, имя: метод $ 1, тип: интерфейс scala.reflect.api.Symbols$ MethodSymbolApi) - объект (класс Test.Main $$ anonfun $ main $ 1,) - поле (класс: org.apache.spark.sql.catalyst.expressions.ScalaUDF $$ anonfun $ 2, имя: func $ 2, тип: интерфейс scala.Function1) - объект (класс org.apache.spark.sql.catalyst.expressions.ScalaUDF $$ anonfun $ 2,) - поле (класс: org.apache.spark.sql.catalyst.expressions.ScalaUDF, имя: f, тип: interface scala.Function1) - объект (класс org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF: len (bb)) - элемент массива (индекс: 0) - массив (класс [Ljava.lang.Object ;, размер 2) - поле (класс: org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8, имя: ссылки $ 1, тип: class [Ljava.lang.Object;) - объект (класс org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8,) в org.apache.spark.serializer.SerializationDebugger $ .improveException (SerializationDebugger.scala: 40) в org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 46) в org.apache.spark.serializer.JavaSerializerInstance.serialsc (Java) в org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 295) ... еще 44

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...