Я пытаюсь отделить свою логику в приложении spark .Я создал отдельный класс для определений UDF и объявлений UDF :
Декларация UDF:
import OPXUdfDefinitions._
object OPXUdf extends Serializable {
def apply(argsInput: Map[String, String]) = {
OPXUdfDefinitions(argsInput)
}
val myUDF = udf(myDef _)
}
Определение UDF:
object OPXUdfDefinitions extends Serializable{
private var args: Map[String, String] = _
def apply(argsInput: Map[String, String]) = {
args = argsInput
println("Inside Apply UDFDef" + args) // This never called
}
lazy val path = args.getOrElse(PATH, "/path/") // Here is a NullPointerException
lazy val myCustomObj = new MyClass(path)
def myDef(a: Integer, b: Integer) = {
myCustomObj.getValue(util.Arrays.asList(a, b))
}
}
Затем у меня есть класс , который вызывает UDF:
import OPXUdf._
class Tasks(args: Map[String, String]){
def getValue()(df: DataFrame): DataFrame = {
df.withColumn("Value", myUDF(col("Num1"), col("Num2")))
}
}
Затем у меня есть абстрактный класс для моей работы ETL и Определение :
case class EtlDefinition(
sourceDF: DataFrame,
transform: (DataFrame => DataFrame),
write: (DataFrame => Unit),
metadata: scala.collection.mutable.Map[String, Any] = scala.collection.mutable.Map[String, Any]()
) extends Serializable {
def process(): Unit = {
write(sourceDF.transform(transform))
}
}
abstract class ETLJob extends Serializable{
def read(): DataFrame
def transform()(df: DataFrame): DataFrame
def write()(df:DataFrame): Unit
def execute(): Unit ={
// Define a base ETL Job for all the usecases
val etl = new EtlDefinition(
sourceDF = read(),
transform = transform(),
write = write()
)
// Execute the job
etl.process()
}
}
Тогда у меня есть моя работа ETL :
class MyETLJob(spark: SparkSession, args: Map[String, String]) extends ETLJob {
val files = args.getOrElse(INPUT_PATH, null).split(",")
val outputPath = args.getOrElse(OUTPUT_PATH, null)
override def read(): DataFrame = {
val inputDF = spark.read.
format("com.databricks.spark.avro").
load(files: _*)
inputDF
}
override def transform()(df: DataFrame): DataFrame = {
val tasks = new Tasks(args)
val transformedDF = df
.transform(tasks.getValue())
transformedDF
}
override def write()(df: DataFrame): Unit = {
df
.write
.mode("overwrite")
.parquet(outputPath)
}
}
Затем в моем основном классе я звоню:
object Main {
def main(args: Array[String]): Unit = {
// Parse the arguments and create a map
val argParams = ArgumentParser.parse(args)
println("ParamsMap: " + argParams) // I am getting the map here
OPXUdfDefinitions(argParams)
OPXUdf(argParams)
val conf = new SparkConf().
set("spark.sql.shuffle.partitions", "100").
set("parquet.enable.summary-metadata", "true")
val spark = SparkSession.
builder().
appName("My App").
config(conf).
getOrCreate()
val etlJob = new MyETLJob(spark, argParams)
etlJob.execute()
}
Но мое OPXUdfDefinitions применение никогда не вызывалось, и я всегда получаю исключение нулевого указателя всякий раз, когда myUDF вызвали.Я попытался инициализировать переменную пути, используя ленивое, но все еще нулевое исключение указателя.
Вот stacktrace:
org.apache.spark.SparkException: Failed to execute user defined function(anonfun$myUDF$1: (int, int) => string)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ScalaUDF_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.subExpr_3$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:217)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at com.myjob.app.OPXUdfDefinitions$.path$lzycompute(OPXUdfDefinitions.scala:37)
at com.myjob.app.OPXUdfDefinitions$.path(OPXUdfDefinitions.scala:37)
at com.myjob.app.OPXUdfDefinitions$.myCustomObj$lzycompute(OPXUdfDefinitions.scala:38)
at com.myjob.app.OPXUdfDefinitions$.myCustomObj(OPXUdfDefinitions.scala:38)
at com.myjob.app.OPXUdfDefinitions$.myDef(OPXUdfDefinitions.scala:45)
at com.myjob.app.OPXUdf$$anonfun$myUDF$1.apply(OPXUdf.scala:19)
at com.myjob.app.OPXUdf$$anonfun$myUDF$1.apply(OPXUdf.scala:19)
Было бы здорово, если кто-то может объяснить, чтоЯ делаю не так и как это можно исправить.Спасибо.