«Задача не сериализуема» со временем java в Spark-shell (или zeppelin), но не в spark-submit - PullRequest
0 голосов
/ 30 марта 2020

Странно, я обнаружил несколько раз, что есть разница при работе с spark-submit против работающей с spark-shell (или zeppelin), хотя я в это не верю.

С некоторыми кодами spark-shell ( или zeppelin) может выдать это исключение, в то время как spark-submit просто отлично работает:

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:844)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:843)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:843)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:608)

Это пример кода (попытается упростить пример), который может вызвать проблему:

import java.time.format.DateTimeFormatter
    import java.time.LocalDate

    def formatter1 = DateTimeFormatter.ofPattern("MM_dd_yy")
    val date1 = udf((date: String) => {val d = date.split("_").map(x => {if (x.length < 2) "0" + x else x}).mkString("_"); LocalDate.from(formatter1.parse(d)).toString})

import org.apache.spark.sql.{DataFrame}
    def melt(toPreserve: Seq[String], toMelt: Seq[String], column: String, row: String, df: DataFrame) : DataFrame = {
      val _vars_and_vals = array((for (c <- toMelt) yield { struct(lit(c).alias(column), col(c).alias(row)) }): _*)
      val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
      val cols = toPreserve.map(col _) ++ { for (x <- List(column, row)) yield { col("_vars_and_vals")(x).alias(x) }}
      _tmp.select(cols: _*)
    }
val cNullState = melt(preserves, melts, "Date", "Confirmed", confirmed).withColumn("Date", date1(col("Date")))

Кроме того, это явление нестабильно, иногда случается, иногда нет.

Я понимаю основы «Задача не сериализуема» о отправке кода на каждый узел и т. Д. c., Но только в этом c пример, я не смог понять.

  1. Что не так с этим кодом?
  2. Если что-то не так, почему spark-submit работает нормально?
  3. Если в этом нет ничего плохого, почему искра-оболочка или дирижабль выдают исключение?

ОБНОВЛЕНИЕ: Я нашел причину, хотя и не полностью понял: это вызвано

.withColumn("Date", date1(col("Date")))

, где date1 udf содержит что-то из java раз. Но почему java время имеет проблему, я не знаю. Заголовок обновлен и теперь содержит «java время».

...