У меня есть ноутбук, основанный преимущественно на Python, и теперь я хочу интегрировать некоторые функциональные возможности Scala UDF (фактически портированные из Python для повышения производительности)
Эти пользовательские функции интенсивно используют карты Python, поэтому мне нужен способ получитьэти карты из Python в JVM.Не будучи экспертом по Scala, я наивно думал, что смогу написать класс Scala в пакете, а затем просто использовать его из Python:
%scala
package com.scalatest
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import collection.mutable.HashMap
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions._
class ScalaSparkTest(jsc: JavaSparkContext, lookups:
java.util.HashMap[String,String]) extends Serializable
{
val lookup = lookups
// val lookup_bc = JavaSparkContext.toSparkContext(jsc).broadcast(lookups)
def lookupUdf = udf((c: String) => lookup(c))
def lookupTest(df: DataFrame, input_col: String, output_col: String):
DataFrame = {
return df.withColumn(output_col, lookupUdf(col(input_col)))
}
}
И Python:
from pyspark.sql import Row, Column, DataFrame
rows = []
lookups = {}
for i in range(10):
c = str(i)
lookups[c] = str(i*2)
rows.append( (c,c) )
df = spark.createDataFrame(rows,['A','B'])
scalaTest = sc._jvm.com.scalatest.ScalaSparkTest(sc._jsc, lookups)
scalaTest.printType()
results = DataFrame(scalaTest.lookupTest(df._jdf,'A','C'),sqlContext)
results.show()
# ==> This is how i do it in Python
lookups_bc = sc.broadcast(lookups)
@pandas_udf(LongType())
def udf_pandas(c):
return c.map(lookups_bc.value)
result = df.select("C", udf_pandas("A"))
Теперь это всеработает как положено, но сериализует весь класс.Я не хочу, чтобы это делалось, я просто хочу передать свои запросы и сделать так, чтобы они транслировались без необходимости посылать весь класс каждой работе.
Учитывая код, который у меня есть, как я могу изменить его, чтобы я былв состоянии транслировать только поиски, а затем использовать в моем UDF?Это легко достигается в Python, используя глобальную переменную для хранения широковещания и позволяя закрытию разбирать его.С Scala, пакетами и классами я не знаю, как получить требуемое поведение, поэтому любые примеры приветствуются.
ПРИМЕЧАНИЕ: Я знаю, что поиск доступен как встроенные функции, но мой вариант использованияболее сложный, чем мой пример кода.
ОБНОВЛЕНИЕ: В ответ на дубликат флага.У меня нет проблемы с вызовом Scala из PySpark, но я не знаю, как это сделать, это создать переменную широковещания в Scala из PySpark, которая затем будет доступна UDF в пакете.У меня есть список карт Python, я хочу, чтобы эти карты транслировались в Scala, а переменная широковещания могла быть доступна через мой UDF.Я не знаю, как структурировать мой Scala и / или взаимодействие с Python