Ошибка выполнения задачи «Сериализируемая искра» при работе с DataFrame - PullRequest
0 голосов
/ 13 мая 2018

Я использую рекомендацию spark mllib и хочу напечатать результат рекомендуемых продуктов.

это мой код

object RecommendBasedView extends App {
    new MachineLearning().doIt
}

class MachineLearning extends Serializable {
def doIt() = {
  val spark = SparkSession.builder.master("local").appName("RecommendBasedView").getOrCreate()
  val data = spark.read.option("header","true").option("inferSchema","true").format("csv").load("productionView.csv")
  val Array(training, test) = data.randomSplit(Array(0.8, 0.2))

  val als = new ALS()
    .setMaxIter(5)
    .setRegParam(0.01)
    .setUserCol("userId")
    .setItemCol("productionId")
    .setRatingCol("count")
  val model = als.fit(training)

  model.setColdStartStrategy("drop")
  val predictions = model.transform(test)

  val evaluator = new RegressionEvaluator()
    .setMetricName("rmse")
    .setLabelCol("count")
    .setPredictionCol("prediction")
  val rmse = evaluator.evaluate(predictions)
  println(s"Root-mean-square error = $rmse")

  val userRecs = model.recommendForAllUsers(10)

  userRecs.show()
  }
}

все, что я делаю для работы с DataFrame, как show (), собирать(), map () он получает эту ошибку ниже

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: scala.reflect.api.TypeTags$PredefTypeCreator
Serialization stack:
  - object not serializable (class: scala.reflect.api.TypeTags$PredefTypeCreator, value: scala.reflect.api.TypeTags$PredefTypeCreator@39cef76d)
- writeObject data (class: scala.reflect.api.SerializedTypeTag)
- object (class scala.reflect.api.SerializedTypeTag, scala.reflect.api.SerializedTypeTag@4f9f3c2f)
- writeReplace data (class: scala.reflect.api.SerializedTypeTag)
- object (class scala.reflect.api.TypeTags$PredefTypeTag, TypeTag[Int])
- field (class: org.apache.spark.ml.recommendation.TopByKeyAggregator, name: org$apache$spark$ml$recommendation$TopByKeyAggregator$$evidence$2, type: interface scala.reflect.api.TypeTags$TypeTag)
- object (class org.apache.spark.ml.recommendation.TopByKeyAggregator, org.apache.spark.ml.recommendation.TopByKeyAggregator@349469db)
- field (class: org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, name: aggregator, type: class org.apache.spark.sql.expressions.Aggregator)
- object (class org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, TopByKeyAggregator(scala.Tuple3))
- field (class: org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, name: aggregateFunction, type: class org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction)
- object (class org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, partial_topbykeyaggregator(org.apache.spark.ml.recommendation.TopByKeyAggregator@349469db, Some(newInstance(class scala.Tuple3)), Some(class scala.Tuple3), Some(StructType(StructField(_1,IntegerType,false), StructField(_2,IntegerType,false), StructField(_3,FloatType,false))), encodeusingserializer(input[0, java.lang.Object, true], true) AS value#157, decodeusingserializer(input[0, binary, true], org.apache.spark.util.BoundedPriorityQueue, true), mapobjects(MapObjects_loopValue39, MapObjects_loopIsNull39, ObjectType(class scala.Tuple2), if (isnull(lambdavariable(MapObjects_loopValue39, MapObjects_loopIsNull39, ObjectType(class scala.Tuple2), true))) null else named_struct(_1, assertnotnull(lambdavariable(MapObjects_loopValue39, MapObjects_loopIsNull39, ObjectType(class scala.Tuple2), true))._1, _2, assertnotnull(lambdavariable(MapObjects_loopValue39, MapObjects_loopIsNull39, ObjectType(class scala.Tuple2), true))._2), input[0, [Lscala.Tuple2;, true], None) AS value#156, ArrayType(StructType(StructField(_1,IntegerType,false), StructField(_2,FloatType,false)),true), true, 0, 0))
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(partial_topbykeyaggregator(org.apache.spark.ml.recommendation.TopByKeyAggregator@349469db, Some(newInstance(class scala.Tuple3)), Some(class scala.Tuple3), Some(StructType(StructField(_1,IntegerType,false), StructField(_2,IntegerType,false), StructField(_3,FloatType,false))), encodeusingserializer(input[0, java.lang.Object, true], true) AS value#157, decodeusingserializer(input[0, binary, true], org.apache.spark.util.BoundedPriorityQueue, true), mapobjects(MapObjects_loopValue39, MapObjects_loopIsNull39, ObjectType(class scala.Tuple2), if (isnull(lambdavariable(MapObjects_loopValue39, MapObjects_loopIsNull39, ObjectType(class scala.Tuple2), true))) null else named_struct(_1, assertnotnull(lambdavariable(MapObjects_loopValue39, MapObjects_loopIsNull39, ObjectType(class scala.Tuple2), true))._1, _2, assertnotnull(lambdavariable(MapObjects_loopValue39, MapObjects_loopIsNull39, ObjectType(class scala.Tuple2), true))._2), input[0, [Lscala.Tuple2;, true], None) AS value#156, ArrayType(StructType(StructField(_1,IntegerType,false), StructField(_2,FloatType,false)),true), true, 0, 0)))
- field (class: org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, name: aggregateExpressions, type: interface scala.collection.Seq)
- object (class org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, ObjectHashAggregate(keys=[value#155], functions=[partial_topbykeyaggregator(org.apache.spark.ml.recommendation.TopByKeyAggregator@349469db, Some(newInstance(class scala.Tuple3)), Some(class scala.Tuple3), Some(StructType(StructField(_1,IntegerType,false), StructField(_2,IntegerType,false), StructField(_3,FloatType,false))), encodeusingserializer(input[0, java.lang.Object, true], true) AS value#157, decodeusingserializer(input[0, binary, true], org.apache.spark.util.BoundedPriorityQueue, true), mapobjects(MapObjects_loopValue39, MapObjects_loopIsNull39, ObjectType(class scala.Tuple2), if (isnull(lambdavariable(MapObjects_loopValue39, MapObjects_loopIsNull39, ObjectType(class scala.Tuple2), true))) null else named_struct(_1, assertnotnull(lambdavariable(MapObjects_loopValue39, MapObjects_loopIsNull39, ObjectType(class scala.Tuple2), true))._1, _2, assertnotnull(lambdavariable(MapObjects_loopValue39, MapObjects_loopIsNull39, ObjectType(class scala.Tuple2), true))._2), input[0, [Lscala.Tuple2;, true], None) AS value#156, ArrayType(StructType(StructField(_1,IntegerType,false), StructField(_2,FloatType,false)),true), true, 0, 0)], output=[value#155, buf#232])
+- AppendColumnsWithObject <function1>, [assertnotnull(input[0, scala.Tuple3, true])._1 AS _1#146, assertnotnull(input[0, scala.Tuple3, true])._2 AS _2#147, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#148], [input[0, int, false] AS value#155]
  +- MapPartitions <function1>, obj#145: scala.Tuple3
    +- DeserializeToObject newInstance(class scala.Tuple2), obj#144: scala.Tuple2
    +- CartesianProduct
    :- *SerializeFromObject [mapobjects(MapObjects_loopValue14, MapObjects_loopIsNull14, ObjectType(class scala.Tuple2), if (isnull(lambdavariable(MapObjects_loopValue14, MapObjects_loopIsNull14, ObjectType(class scala.Tuple2), true))) null else named_struct(_1, assertnotnull(lambdavariable(MapObjects_loopValue14, MapObjects_loopIsNull14, ObjectType(class scala.Tuple2), true))._1, _2, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(lambdavariable(MapObjects_loopValue14, MapObjects_loopIsNull14, ObjectType(class scala.Tuple2), true))._2, true)), input[0, scala.collection.Seq, true], None) AS value#125]
    :  +- MapPartitions <function1>, obj#124: scala.collection.Seq
      :     +- DeserializeToObject newInstance(class scala.Tuple2), obj#123: scala.Tuple2
      :        +- *Project [_1#35 AS id#38, _2#36 AS features#39]
      :           +- *SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#35, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(input[0, scala.Tuple2, true])._2, true) AS _2#36]
      :              +- Scan ExternalRDDScan[obj#34]
      +- *SerializeFromObject [mapobjects(MapObjects_loopValue19, MapObjects_loopIsNull19, ObjectType(class scala.Tuple2), if (isnull(lambdavariable(MapObjects_loopValue19, MapObjects_loopIsNull19, ObjectType(class scala.Tuple2), true))) null else named_struct(_1, assertnotnull(lambdavariable(MapObjects_loopValue19, MapObjects_loopIsNull19, ObjectType(class scala.Tuple2), true))._1, _2, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(lambdavariable(MapObjects_loopValue19, MapObjects_loopIsNull19, ObjectType(class scala.Tuple2), true))._2, true)), input[0, scala.collection.Seq, true], None) AS value#133]
      +- MapPartitions <function1>, obj#132: scala.collection.Seq
        +- DeserializeToObject newInstance(class scala.Tuple2), obj#131: scala.Tuple2
        +- *Project [_1#46 AS id#49, _2#47 AS features#50]
        +- *SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#46, staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(FloatType,false), fromPrimitiveArray, assertnotnull(input[0, scala.Tuple2, true])._2, true) AS _2#47]
        +- Scan ExternalRDDScan[obj#45]
        )
        - field (class: org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1, name: $outer, type: class org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec)
        - object (class org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1, <function0>)
          - field (class: org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2, name: $outer, type: class org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1)
          - object (class org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2, <function1>)
            - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, name: f$23, type: interface scala.Function1)
            - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, <function0>)
              - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1)
              - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25, <function3>)
                - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3)
                - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[168] at show at RecommendBasedView.scala:52)
                - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
                - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@4e003bf9)
                - writeObject data (class: scala.collection.immutable.List$SerializationProxy)
                - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@73ce9f16)
                - writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
                - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@4e003bf9))
                - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
                - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[169] at show at RecommendBasedView.scala:52)
                - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
                - object (class scala.Tuple2, (MapPartitionsRDD[169] at show at RecommendBasedView.scala:52,org.apache.spark.ShuffleDependency@bf60f8))
                at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
                at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
                at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
                at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
                at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1010)
                at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:930)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:933)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:932)
                at scala.collection.immutable.List.foreach(List.scala:383)
                at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:932)
                at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:874)
                at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1695)
                at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
                at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
                at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
                at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
                at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
                at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
                at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
                at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
                at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
                at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861)
                at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
                at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
                at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
                at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
                at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
                at org.apache.spark.sql.Dataset.head(Dataset.scala:2150)
                at org.apache.spark.sql.Dataset.take(Dataset.scala:2363)
                at org.apache.spark.sql.Dataset.showString(Dataset.scala:241)
                at org.apache.spark.sql.Dataset.show(Dataset.scala:637)
                at org.apache.spark.sql.Dataset.show(Dataset.scala:596)
                at org.apache.spark.sql.Dataset.show(Dataset.scala:605)
                at MachineLearning.doIt(RecommendBasedView.scala:52)
                at RecommendBasedView$.delayedEndpoint$RecommendBasedView$1(RecommendBasedView.scala:14)
                at RecommendBasedView$delayedInit$body.apply(RecommendBasedView.scala:11)
                at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
                at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
                at scala.App$$anonfun$main$1.apply(App.scala:76)
                at scala.App$$anonfun$main$1.apply(App.scala:76)
                at scala.collection.immutable.List.foreach(List.scala:383)
                at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
                at scala.App$class.main(App.scala:76)
                at RecommendBasedView$.main(RecommendBasedView.scala:11)
                at RecommendBasedView.main(RecommendBasedView.scala)

Я пробовал это с spark версии 2.2.0, 2.3.0, и они не работают вообще.когда я набираю training.show(), это работает, но не работает с DataFrame из модели ALS.

---------------------------- добавлено ----------------------------

Судя по всему, из spark 2.x можно отслеживать сериализуемыйпо SerializationDebugger, который, кажется, тот, который возникает эти ошибки выше, но не знаю, что проблема

также он работает с predictions.show(), но не работает с val userRecs = model.recommendForAllUsers(10) userRecs.show()

ТакжеЯ обнаружил, что он работает на строке комманды spark-shell с некоторой другой ошибкой Illegal character in path at index 36: spark://192.168.0.154:64833/classes//scala.class, но все равно выводит результат, тогда как мой проект intellij не

...