Scala объект применяет метод, никогда не вызываемый в Spark Job - PullRequest
0 голосов
/ 07 мая 2019

Я пытаюсь отделить свою логику в приложении spark .Я создал отдельный класс для определений UDF и объявлений UDF :

Декларация UDF:

import OPXUdfDefinitions._ 
object OPXUdf extends Serializable {
  def apply(argsInput: Map[String, String]) = {
    OPXUdfDefinitions(argsInput)
  }
  val myUDF = udf(myDef _)
}

Определение UDF:

object OPXUdfDefinitions extends Serializable{

  private var args: Map[String, String] = _

  def apply(argsInput: Map[String, String]) = {
    args = argsInput
    println("Inside Apply UDFDef" + args)                       // This never called
  }

  lazy val path = args.getOrElse(PATH, "/path/")               // Here is a NullPointerException
  lazy val myCustomObj = new MyClass(path)

  def myDef(a: Integer, b: Integer) = { 
    myCustomObj.getValue(util.Arrays.asList(a, b))
  }

}

Затем у меня есть класс , который вызывает UDF:

import OPXUdf._
class Tasks(args: Map[String, String]){
  def getValue()(df: DataFrame): DataFrame = {
    df.withColumn("Value", myUDF(col("Num1"), col("Num2")))
  }
}

Затем у меня есть абстрактный класс для моей работы ETL и Определение :

case class EtlDefinition(
                          sourceDF: DataFrame,
                          transform: (DataFrame => DataFrame),
                          write: (DataFrame => Unit),
                          metadata: scala.collection.mutable.Map[String, Any] = scala.collection.mutable.Map[String, Any]()
                        ) extends Serializable {

  def process(): Unit = {
    write(sourceDF.transform(transform))
  }
}

abstract class ETLJob extends Serializable{

  def read(): DataFrame
  def transform()(df: DataFrame): DataFrame
  def write()(df:DataFrame): Unit

  def execute(): Unit ={
    // Define a base ETL Job for all the usecases
    val etl = new EtlDefinition(
      sourceDF = read(),
      transform = transform(),
      write = write()
    )

    // Execute the job
    etl.process()
  }
}

Тогда у меня есть моя работа ETL :

class MyETLJob(spark: SparkSession, args: Map[String, String]) extends ETLJob {

  val files = args.getOrElse(INPUT_PATH, null).split(",")
  val outputPath = args.getOrElse(OUTPUT_PATH, null)
  override def read(): DataFrame = {
    val inputDF = spark.read.
      format("com.databricks.spark.avro").
      load(files: _*)
    inputDF
  }

  override def transform()(df: DataFrame): DataFrame = {
    val tasks = new Tasks(args)
    val transformedDF = df
      .transform(tasks.getValue())

    transformedDF
  }

  override def write()(df: DataFrame): Unit = {
    df
      .write
      .mode("overwrite")
      .parquet(outputPath)
  }
}

Затем в моем основном классе я звоню:

object Main {
  def main(args: Array[String]): Unit = {

    // Parse the arguments and create a map
    val argParams = ArgumentParser.parse(args)
    println("ParamsMap: " + argParams)  // I am getting the map here

    OPXUdfDefinitions(argParams)
    OPXUdf(argParams)

    val conf = new SparkConf().
      set("spark.sql.shuffle.partitions", "100").
      set("parquet.enable.summary-metadata", "true")

    val spark = SparkSession.
      builder().
      appName("My App").
      config(conf).
      getOrCreate()

  val etlJob = new MyETLJob(spark, argParams)
  etlJob.execute()
}

Но мое OPXUdfDefinitions применение никогда не вызывалось, и я всегда получаю исключение нулевого указателя всякий раз, когда myUDF вызвали.Я попытался инициализировать переменную пути, используя ленивое, но все еще нулевое исключение указателя.

Вот stacktrace:

    org.apache.spark.SparkException: Failed to execute user defined function(anonfun$myUDF$1: (int, int) => string)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ScalaUDF_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.subExpr_3$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:217)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at com.myjob.app.OPXUdfDefinitions$.path$lzycompute(OPXUdfDefinitions.scala:37)
        at com.myjob.app.OPXUdfDefinitions$.path(OPXUdfDefinitions.scala:37)
        at com.myjob.app.OPXUdfDefinitions$.myCustomObj$lzycompute(OPXUdfDefinitions.scala:38)
        at com.myjob.app.OPXUdfDefinitions$.myCustomObj(OPXUdfDefinitions.scala:38)
        at com.myjob.app.OPXUdfDefinitions$.myDef(OPXUdfDefinitions.scala:45)
        at com.myjob.app.OPXUdf$$anonfun$myUDF$1.apply(OPXUdf.scala:19)
        at com.myjob.app.OPXUdf$$anonfun$myUDF$1.apply(OPXUdf.scala:19)

Было бы здорово, если кто-то может объяснить, чтоЯ делаю не так и как это можно исправить.Спасибо.

Ответы [ 2 ]

0 голосов
/ 07 мая 2019

Можешь попробовать?:

def myDef(a: Integer, b: Integer): String = {

и

val myUDF = udf((a: Integer, b: Integer) => myDef(a,b))
0 голосов
/ 07 мая 2019

Проблема внутри OPXUdfDefinitions, потому что, хотя path равен lazy, вы вызываете args до инициализации в методе apply при вызове println("UDFDef" + args).

Если вы удалите println(args), вы больше не должны встретить NullPointerException, например:

object OPXUdfDefinitions extends Serializable{

  private var args: Map[String, String] = _

  def apply(argsInput: Map[String, String]) = {
    args = argsInput
    println("Inside Apply UDFDef" + args)                       // This never called
  }

  //println("UDFDef" + args)   // this is initializing args
  lazy val path = args.getOrElse(PATH, "/path/")
  lazy val myCustomObj = new MyClass(path)
  //println("UDFDef" + args)    // same here

  def myDef(a: Integer, b: Integer) = { 
    myCustomObj.getValue(util.Arrays.asList(a, b))
  }

}
...