Я столкнулся со следующей проблемой при работе с udf в spark.Он прекрасно работал, когда я запускал команды в spark2-shell, но то же самое выдает ошибку, когда jar соблюдается и запускается spark2-submit.
val input_df: DataFrame = spark.table(input_table)
def hash_fn: UserDefinedFunction = udf((value: String) => value.hashCode )
val hash_df: DataFrame = input_df.withColumn("new_column", hash_fn($"old_column"))
Stacktrace как указано ниже
Exception during processing!
*** START STACK TRACE ***
org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 80.0 failed 4 times, most recent failure: Lost task 18.3 in stage 80.0 (TID 13680, xxxxxxxxxxxxxx, executor 4): java.lang.NoClassDefFoundError: Could not initialize class com.abc.xxxx.class_defg$
Я не уверен, почему в кластере происходит сбой той же функции.пожалуйста помоги.
Команда Spark submit - как показано ниже
spark2-submit --master yarn --deploy-mode cluster --driver-memory 20g --num-executors 2 --executor-cores 5 --executor-memory 8g --conf spark.yarn.executor.memoryOverhead=4g --queue queue_name --conf spark.sql.shuffle.partitions=250 --conf spark.dynamicAllocation.minExecutors=1 --conf spark.dynamicAllocation.maxExecutors=10 --conf hive.database=dev --conf hive.output.min_number_of_files=1 --class com.abc.xxxx.class_defg ./input_new.jar
, а трассировка стека драйверов - как показано ниже
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2405)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2404)
at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2778)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2404)
at class com.abc.xxxx.class_defg$.get_hash_df(class_defg.scala:156)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:646)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class class com.abc.xxxx.class_defg$
После изменения программы и вызова udf внутри пользовательской функции(org.apache.spark.sql.expressions.UserDefinedFunction), он выполнен нормально.Может ли кто-нибудь объяснить мне разницу и почему не работает spark-submit в режиме кластера пряжи.