Scala Spark Task не сериализуемая ошибка в коде - PullRequest
0 голосов
/ 18 февраля 2020

Не уверен, что не так с кодом ниже, но выдает ошибку

org.apache.spark.SparkException: Task not serializable

. Погуглил насчет ошибки, но не смог ее решить.

Ниже приведен код: (Копировать можно вставить и выполнить на community.cloud.databricks.com, создав новый блокнот Scala)

    import com.google.gson._    
     object TweetUtils {
          case class Tweet(
               id : String,
               user : String,
               userName : String,
               text : String,
               place : String,
               country : String,
               lang : String
          ) 

       def parseFromJson(lines:Iterator[String]):Iterator[Tweet] = {
            val gson = new Gson
            lines.map( line => gson.fromJson(line, classOf[Tweet]))     
       }

       def loadData(): RDD[Tweet] = { 
           val pathToFile = "/FileStore/tables/reduced_tweets-57570.json"
           sc.textFile(pathToFile).mapPartitions(parseFromJson(_))
       }

       def tweetsByUser(): RDD[(String, Iterable[Tweet])] = {
           val tweets = loadData
           tweets.groupBy(_.user)    
       }  
   } 

   val res = TweetUtils.tweetsByUser()
   res.collect().take(5).foreach(println)

Ниже приведено подробное сообщение об ошибке:

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2548)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:826)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:392)
    at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:826)
    at line05db6d250e4b42e2b2c1d6b97ba83df533.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$TweetUtils$.loadData(command-3696793732897971:22)
    at line05db6d250e4b42e2b2c1d6b97ba83df533.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$TweetUtils$.tweetsByUser(command-3696793732897971:25)
    at line05db6d250e4b42e2b2c1d6b97ba83df533.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3696793732897971:30)
    at line05db6d250e4b42e2b2c1d6b97ba83df533.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3696793732897971:84)
    at line05db6d250e4b42e2b2c1d6b97ba83df533.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-3696793732897971:86)
    at line05db6d250e4b42e2b2c1d6b97ba83df533.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-3696793732897971:88)
    at line05db6d250e4b42e2b2c1d6b97ba83df533.$read$$iw$$iw$$iw$$iw.<init>(command-3696793732897971:90)
    at line05db6d250e4b42e2b2c1d6b97ba83df533.$read$$iw$$iw$$iw.<init>(command-3696793732897971:92)
    at line05db6d250e4b42e2b2c1d6b97ba83df533.$read$$iw$$iw.<init>(command-3696793732897971:94)

Заранее спасибо,

Шри

Ответы [ 2 ]

0 голосов
/ 20 февраля 2020

Наконец, то, что сработало, - это совместная реализация обоих предложений от «Артема Алиева» и «Партхи». то есть, перемещая "case class Tweet" за пределы "TweetUtils object", а также расширяя Object, объект TweetUtils расширяет Serializable "

Спасибо вам обоим.

0 голосов
/ 18 февраля 2020

Сделайте ваш TweetUtils объект как Serializable, и он должен работать:

object TweetUtils extends Serializable

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...