Искра: объект не сериализуем - PullRequest
1 голос
/ 31 марта 2020

У меня есть пакетное задание, которое я пытаюсь преобразовать в структурированную потоковую передачу. Я получаю следующую ошибку:

20/03/31 15:09:23 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException: com.apple.ireporter.analytics.compute.AggregateKey
Serialization stack:
    - object not serializable (class: com.apple.ireporter.analytics.compute.AggregateKey, value: d_)

... где "d_" - последняя строка в наборе данных

Это соответствующий фрагмент кода

    df.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
      import spark.implicits._
      val javaRdd = batchDF.toJavaRDD
      val dataframeToRowColFunction = new RowToColumn(table)
      println("Back to Main class")
      val combinedRdd =javaRdd.flatMapToPair(dataframeToRowColFunction.FlatMapData2).combineByKey(aggrCreateComb.createCombiner,aggrMerge.aggrMerge,aggrMergeCombiner.aggrMergeCombiner)
      // spark.createDataFrame( combinedRdd).show(1); // I commented this
      // combinedRdd.collect() // I added this as a test
    }

Это класс FlatMapData2

  val FlatMapData2: PairFlatMapFunction[Row, AggregateKey, AggregateValue] = new PairFlatMapFunction[Row, AggregateKey, AggregateValue]() {
    //val FlatMapData: PairFlatMapFunction[Row, String, AggregateValue] = new PairFlatMapFunction[Row, String, AggregateValue]() {
    override def call(x: Row) = {
      val tuples = new util.ArrayList[Tuple2[AggregateKey, AggregateValue]]
      val decomposedEvents = decomposer.decomposeDistributed(x)
      decomposedEvents.foreach {
        y =>  tuples.add(Tuple2(y._1,y._2))
      }
      tuples.iterator()
    }
  }

Вот совокупный класс ключей

class AggregateKey(var partitionkeys: Map[Int,Any],var clusteringkeys : Map[Int,Any]) extends Comparable [AggregateKey]{
...
}

Я новичок в этом, и любая помощь будет признательна. Пожалуйста, дайте мне знать, если что-то еще нужно добавить

1 Ответ

0 голосов
/ 06 апреля 2020

Мне удалось решить эту проблему, заставив AggregateKey расширить java .io.Serializable

class AggregateKey(var partitionkeys: Map[Int,Any],var clusteringkeys : Map[Int,Any]) extends java.io.Serializable{
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...