Странно, я обнаружил несколько раз, что есть разница при работе с spark-submit против работающей с spark-shell (или zeppelin), хотя я в это не верю.
С некоторыми кодами spark-shell ( или zeppelin) может выдать это исключение, в то время как spark-submit просто отлично работает:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:844)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:843)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:843)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:608)
Это пример кода (попытается упростить пример), который может вызвать проблему:
import java.time.format.DateTimeFormatter
import java.time.LocalDate
def formatter1 = DateTimeFormatter.ofPattern("MM_dd_yy")
val date1 = udf((date: String) => {val d = date.split("_").map(x => {if (x.length < 2) "0" + x else x}).mkString("_"); LocalDate.from(formatter1.parse(d)).toString})
import org.apache.spark.sql.{DataFrame}
def melt(toPreserve: Seq[String], toMelt: Seq[String], column: String, row: String, df: DataFrame) : DataFrame = {
val _vars_and_vals = array((for (c <- toMelt) yield { struct(lit(c).alias(column), col(c).alias(row)) }): _*)
val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
val cols = toPreserve.map(col _) ++ { for (x <- List(column, row)) yield { col("_vars_and_vals")(x).alias(x) }}
_tmp.select(cols: _*)
}
val cNullState = melt(preserves, melts, "Date", "Confirmed", confirmed).withColumn("Date", date1(col("Date")))
Кроме того, это явление нестабильно, иногда случается, иногда нет.
Я понимаю основы «Задача не сериализуема» о отправке кода на каждый узел и т. Д. c., Но только в этом c пример, я не смог понять.
- Что не так с этим кодом?
- Если что-то не так, почему spark-submit работает нормально?
- Если в этом нет ничего плохого, почему искра-оболочка или дирижабль выдают исключение?
ОБНОВЛЕНИЕ: Я нашел причину, хотя и не полностью понял: это вызвано
.withColumn("Date", date1(col("Date")))
, где date1
udf содержит что-то из java раз. Но почему java время имеет проблему, я не знаю. Заголовок обновлен и теперь содержит «java время».