Это правильный способ использовать Spark трансляцию в пакете Scala - PullRequest
0 голосов
/ 16 мая 2019

Я пишу записную книжку на Python, и мне нужно делегировать часть работы в Scala UDF.Я хочу иметь возможность передать карту Python, заставить мою Scala транслировать карту, а затем использовать ее из UDF.

Единственный код, который я могу придумать, который не вызывает проблем с сериализацией задач, этоэто:

%scala

package com.scalatest

import org.apache.spark.sql.expressions.UserDefinedFunction
import scala.collection.JavaConverters._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import collection.mutable.HashMap
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions._

class ScalaSparkTest(jsc: JavaSparkContext, lookups: 
  java.util.HashMap[String,String])
{
  object LookupObj extends Serializable
  {
    val lookup_bc = 
      JavaSparkContext.toSparkContext(jsc).broadcast(lookups.asScala.toMap)  
    def lookupUdf = udf((c: String) => lookup_bc.value.get(c))
  }

  def lookupTest(df: DataFrame, i: String, o: String):
      DataFrame = {
        return df.withColumn(o, LookupObj.lookupUdf(col(i)))
      }
}

И затем вызов из PySpark:

scalaTest = sc._jvm.com.scalatest.ScalaSparkTest(sc._jsc, lookups)
results = DataFrame(scalaTest.lookupTest(df._jdf,'A','B'),sqlContext)
results = results.show()

Это работает, но мне интересно, если это неэффективно.В этом коде происходит трансляция поисков каждый раз, когда я вызываю функцию, или только один раз?

В моем случае я создаю объект класса один раз, поэтому должен быть только один экземпляр объекта, поэтому я предполагаюон транслируется только один раз?

Также я не понимаю, почему мне нужно расширить Serializable на объекте, чтобы заставить его работать?

Извинения Я не эксперт по Scala.Я просто хочу, чтобы однажды транслировалась карта Python и много раз она использовалась моим Scala.

...