Spark udf работал в spark2-shell, но не работал в spark-submit, если в выражениях определена та же функция. - PullRequest
0 голосов
/ 18 октября 2018

Я столкнулся со следующей проблемой при работе с udf в spark.Он прекрасно работал, когда я запускал команды в spark2-shell, но то же самое выдает ошибку, когда jar соблюдается и запускается spark2-submit.

val input_df: DataFrame = spark.table(input_table)
def hash_fn: UserDefinedFunction = udf((value: String) => value.hashCode )
val hash_df: DataFrame = input_df.withColumn("new_column", hash_fn($"old_column"))

Stacktrace как указано ниже

Exception during processing!
 *** START STACK TRACE ***
org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 80.0 failed 4 times, most recent failure: Lost task 18.3 in stage 80.0 (TID 13680, xxxxxxxxxxxxxx, executor 4): java.lang.NoClassDefFoundError: Could not initialize class com.abc.xxxx.class_defg$

Я не уверен, почему в кластере происходит сбой той же функции.пожалуйста помоги.

Команда Spark submit - как показано ниже

spark2-submit --master yarn --deploy-mode cluster --driver-memory 20g --num-executors 2 --executor-cores 5 --executor-memory 8g --conf spark.yarn.executor.memoryOverhead=4g --queue queue_name --conf spark.sql.shuffle.partitions=250 --conf spark.dynamicAllocation.minExecutors=1 --conf spark.dynamicAllocation.maxExecutors=10 --conf hive.database=dev --conf hive.output.min_number_of_files=1 --class com.abc.xxxx.class_defg  ./input_new.jar

, а трассировка стека драйверов - как показано ниже

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275)
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
    at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2405)
    at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2404)
    at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2778)
    at org.apache.spark.sql.Dataset.count(Dataset.scala:2404)
    at class com.abc.xxxx.class_defg$.get_hash_df(class_defg.scala:156)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:646)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class class com.abc.xxxx.class_defg$

После изменения программы и вызова udf внутри пользовательской функции(org.apache.spark.sql.expressions.UserDefinedFunction), он выполнен нормально.Может ли кто-нибудь объяснить мне разницу и почему не работает spark-submit в режиме кластера пряжи.

...