Я пишу записную книжку на Python, и мне нужно делегировать часть работы в Scala UDF.Я хочу иметь возможность передать карту Python, заставить мою Scala транслировать карту, а затем использовать ее из UDF.
Единственный код, который я могу придумать, который не вызывает проблем с сериализацией задач, этоэто:
%scala
package com.scalatest
import org.apache.spark.sql.expressions.UserDefinedFunction
import scala.collection.JavaConverters._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import collection.mutable.HashMap
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions._
class ScalaSparkTest(jsc: JavaSparkContext, lookups:
java.util.HashMap[String,String])
{
object LookupObj extends Serializable
{
val lookup_bc =
JavaSparkContext.toSparkContext(jsc).broadcast(lookups.asScala.toMap)
def lookupUdf = udf((c: String) => lookup_bc.value.get(c))
}
def lookupTest(df: DataFrame, i: String, o: String):
DataFrame = {
return df.withColumn(o, LookupObj.lookupUdf(col(i)))
}
}
И затем вызов из PySpark:
scalaTest = sc._jvm.com.scalatest.ScalaSparkTest(sc._jsc, lookups)
results = DataFrame(scalaTest.lookupTest(df._jdf,'A','B'),sqlContext)
results = results.show()
Это работает, но мне интересно, если это неэффективно.В этом коде происходит трансляция поисков каждый раз, когда я вызываю функцию, или только один раз?
В моем случае я создаю объект класса один раз, поэтому должен быть только один экземпляр объекта, поэтому я предполагаюон транслируется только один раз?
Также я не понимаю, почему мне нужно расширить Serializable на объекте, чтобы заставить его работать?
Извинения Я не эксперт по Scala.Я просто хочу, чтобы однажды транслировалась карта Python и много раз она использовалась моим Scala.