Как интегрировать карты Python, переменные трансляции и Scala UDF в Spark - PullRequest
0 голосов
/ 14 мая 2019

У меня есть ноутбук, основанный преимущественно на Python, и теперь я хочу интегрировать некоторые функциональные возможности Scala UDF (фактически портированные из Python для повышения производительности)

Эти пользовательские функции интенсивно используют карты Python, поэтому мне нужен способ получитьэти карты из Python в JVM.Не будучи экспертом по Scala, я наивно думал, что смогу написать класс Scala в пакете, а затем просто использовать его из Python:

%scala

package com.scalatest

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import collection.mutable.HashMap
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions._

class ScalaSparkTest(jsc: JavaSparkContext, lookups: 
   java.util.HashMap[String,String]) extends Serializable
{
  val lookup = lookups
//  val lookup_bc = JavaSparkContext.toSparkContext(jsc).broadcast(lookups)

  def lookupUdf = udf((c: String) => lookup(c))

  def lookupTest(df: DataFrame, input_col: String, output_col: String):
    DataFrame = {
      return df.withColumn(output_col, lookupUdf(col(input_col)))
    }
}

И Python:

from pyspark.sql import Row, Column, DataFrame

rows = []
lookups = {}

for i in range(10):
  c = str(i)
  lookups[c] = str(i*2)
  rows.append( (c,c) )
df = spark.createDataFrame(rows,['A','B'])

scalaTest = sc._jvm.com.scalatest.ScalaSparkTest(sc._jsc, lookups)
scalaTest.printType()
results = DataFrame(scalaTest.lookupTest(df._jdf,'A','C'),sqlContext)
results.show()

# ==> This is how i do it in Python
lookups_bc = sc.broadcast(lookups)

@pandas_udf(LongType())
def udf_pandas(c):
  return c.map(lookups_bc.value)

result = df.select("C", udf_pandas("A"))

Теперь это всеработает как положено, но сериализует весь класс.Я не хочу, чтобы это делалось, я просто хочу передать свои запросы и сделать так, чтобы они транслировались без необходимости посылать весь класс каждой работе.

Учитывая код, который у меня есть, как я могу изменить его, чтобы я былв состоянии транслировать только поиски, а затем использовать в моем UDF?Это легко достигается в Python, используя глобальную переменную для хранения широковещания и позволяя закрытию разбирать его.С Scala, пакетами и классами я не знаю, как получить требуемое поведение, поэтому любые примеры приветствуются.

ПРИМЕЧАНИЕ: Я знаю, что поиск доступен как встроенные функции, но мой вариант использованияболее сложный, чем мой пример кода.

ОБНОВЛЕНИЕ: В ответ на дубликат флага.У меня нет проблемы с вызовом Scala из PySpark, но я не знаю, как это сделать, это создать переменную широковещания в Scala из PySpark, которая затем будет доступна UDF в пакете.У меня есть список карт Python, я хочу, чтобы эти карты транслировались в Scala, а переменная широковещания могла быть доступна через мой UDF.Я не знаю, как структурировать мой Scala и / или взаимодействие с Python

...