Вызывает ли использование функции в преобразовании не сериализуемые исключения? - PullRequest
0 голосов
/ 30 июня 2018

У меня есть Breeze DenseMatrix, я нахожу mean на строку и mean квадратов на строку и помещаю их в другой DenseMatrix, по одному на столбец. Но я получаю Task Not Serializable исключение. Я знаю, что sc - это не Serializable, но я думаю, что это исключение, потому что я вызываю функции в преобразовании в безопасных зонах.

Я прав? И как может быть возможным сделать это без каких-либо функций? Любая помощь будет отличной!

Код:

object MotitorDetection {
case class MonDetect() extends Serializable {

var sc: SparkContext = _
var machines: Int=0
var counters: Int=0
var GlobalVec= BDM.zeros[Double](counters, 2)

def findMean(a: BDM[Double]): BDV[Double] = {
  var c = mean(a(*, ::))
  c}

def toMatrix(x: BDV[Double], y: BDV[Double], C: Int): BDM[Double]={
  val m = BDM.zeros[Double](C,2)
  m(::, 0) := x
  m(::, 1) := y
  m}

def SafeZones(stream: DStream[(Int, BDM[Double])]){

  stream.foreachRDD { (rdd: RDD[(Int, BDM[Double])], _) =>
    if (isEmpty(rdd) == false) {

      val InputVec = rdd.map(x=> (x._1, toMatrix(findMean(x._2), findMean(pow(x._2, 2)), counters)))
      GlobalMeanVector(InputVec)
    }}}

Исключение:

org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
        at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
        at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.map(RDD.scala:369)
        at ScalaApps.MotitorDetection$MonDetect$$anonfun$SafeZones$1.apply(MotitorDetection.scala:85)
        at ScalaApps.MotitorDetection$MonDetect$$anonfun$SafeZones$1.apply(MotitorDetection.scala:82)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack:
        - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@6eee7027)
        - field (class: ScalaApps.MotitorDetection$MonDetect, name: sc, type: class org.apache.spark.SparkContext)
        - object (class ScalaApps.MotitorDetection$MonDetect, MonDetect())
        - field (class: ScalaApps.MotitorDetection$MonDetect$$anonfun$SafeZones$1, name: $outer, type: class ScalaApps.MotitorDetection$MonDetect)
        - object (class ScalaApps.MotitorDetection$MonDetect$$anonfun$SafeZones$1, <function2>)
        - field (class: ScalaApps.MotitorDetection$MonDetect$$anonfun$SafeZones$1$$anonfun$2, name: $outer, type: class ScalaApps.MotitorDetection$MonDetect$$anonfun$SafeZones$1)
        - object (class ScalaApps.MotitorDetection$MonDetect$$anonfun$SafeZones$1$$anonfun$2, <function1>)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
        ... 28 more

1 Ответ

0 голосов
/ 30 июня 2018

Метод findMean - это метод объекта MotitorDetection. Объект MotitorDetection имеет на борту SparkContext, который не сериализуем. Таким образом, задача, используемая в rdd.map, не сериализуема.

Переместить все функции, связанные с матрицей, в отдельный сериализуемый объект, MatrixUtils, скажем:

object MatrixUtils {
  def findMean(a: BDM[Double]): BDV[Double] = {
    var c = mean(a(*, ::))
    c
  }

  def toMatrix(x: BDV[Double], y: BDV[Double], C: Int): BDM[Double]={
    val m = BDM.zeros[Double](C,2)
    m(::, 0) := x
    m(::, 1) := y
    m
  }

  ...
}

, а затем использовать только те методы из rdd.map(...):

object MotitorDetection {
  val sc = ...

  def SafeZones(stream: DStream[(Int, BDM[Double])]){
    import MatrixUtils._

    ... = rdd.map( ... )

  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...