Метод Spark Scala fit для Dataframe создает исключение NotSerializableException - PullRequest
0 голосов
/ 26 марта 2020

Я пытаюсь использовать Spark ML's Random Forest Classifier в Scala. Я получаю сообщение об ошибке «org. apache .spark.SparkException: Task not serializable». Я прошел через подобные вопросы в StackOverflow, но ничего не соответствовало моему контексту. Ниже приведен мой код:

object SparkSql extends Serializable{
    def main(args: Array[String]){
        val spark = SparkSession.builder.master("local").appName("Spark Demo").getOrCreate()
        spark.sparkContext.setLogLevel("ERROR")
        val df = spark.read.option("header", "true").option("inferSchema", "true").csv("C:\\data\\Customer-Churn.csv")
        val updatedDf = df.withColumn("TotalCharges", when(col("TotalCharges").equalTo(" "), null).otherwise(col("TotalCharges")))
        val updatedDf_2 = updatedDf.withColumn("TotalCharges",col("TotalCharges").cast(DoubleType))
        val cols = Array("MonthlyCharges", "TotalCharges")

        val assembler = new VectorAssembler().setInputCols(cols).setOutputCol("features")
        val featureDf = assembler.transform(updatedDf_2)
        featureDf.printSchema()

        val indexer = new StringIndexer().setInputCol("Churn").setOutputCol("label")
        val labelDf = indexer.fit(featureDf).transform(featureDf)
        labelDf.printSchema()

        val Array(trainingData, testData) = labelDf.randomSplit(Array(0.7, 0.3), seed = 11L)
        // train Random Forest model with training data set
        val randomForestClassifier = new RandomForestClassifier()
            .setLabelCol("label")
            .setFeaturesCol("features")
            .setNumTrees(50)
            .setMaxDepth(10)

        println(trainingData.show(5))       
        val randomForestModel = randomForestClassifier.fit(trainingData)
        val predictions = randomForestModel.transform(testData)

Я получаю ниже ошибки при выполнении этого кода:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2100)
at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1364)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.take(RDD.scala:1337)
at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:112)
at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:106)
at org.apache.spark.ml.classification.RandomForestClassifier.$anonfun$train$1(RandomForestClassifier.scala:141)
at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:183)
at scala.util.Try$.apply(Try.scala:209)
at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:120)
at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:46)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
at demo.scala.SparkSql$.main(SparkSql.scala:98)
at demo.scala.SparkSql.main(SparkSql.scala)
Caused by: java.io.NotSerializableException: scala.runtime.LazyRef

Пожалуйста, помогите мне решить эту проблему.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...