Задача не сериализуема: java.io.NotSerializableException: org.apache.spark.unsafe.types.UTF8String $ IntWrapper - PullRequest
0 голосов
/ 27 февраля 2019

В настоящее время я работаю над переносом таблицы кустов.Проблема, с которой я сталкиваюсь, заключается в том, что, когда я пытаюсь загрузить его в информационный фрейм в искре и записать обратно в Hive, возникает ошибка сериализации, как на скриншоте.Тем не менее, когда я пытаюсь выполнить преобразование Dataframe с ~ 60 столбцами, он работает нормально.У нас есть приблизительно 200 столбцов, которые нам нужны из 1800 столбцов в таблице.Я думаю, что это из-за размера таблицы, которая вызывает эту ошибку.

Я думаю, что ошибка сериализации вызвана «Spark версии 2.2.0» (используемой в наших кластерах разработки).Существует ошибка, связанная с «org.apache.spark.unsafe.types.UTF8String $ IntWrapper», которая вызывает эту ошибку в этой версии.В версиях Spark 2.2.1 и выше эта ошибка была устранена в соответствии с отчетом о проблеме.В качестве поворота к этому я попытался обернуть развернутую флягу с обновленными зависимостями.Я все еще сталкиваюсь с той же проблемой, потому что я думаю, что она все еще использует jar-файлы по умолчанию из среды разработки.Я был бы очень благодарен, если есть какая-либо альтернатива для его запуска.

Ссылка на ошибку apache = https://issues.apache.org/jira/browse/SPARK-21445

Ссылка на решение = https://github.com/apache/spark/pull/18660/commits/d2202903518b3dfa0f4a719a0b9cb5431088ed66

ВРешение проблемы они написали код в Java, я пишу в Scala.Я хочу знать, как импортировать классы "IntWrapper" и "LongWrapper" из библиотеки UTF8String и позволить программе использовать мои объявленные переменные.Для этого я написал классы, взяв класс в ссылке в качестве ссылки и используя KryoRegistrator в spark, чтобы вызвать эти классы к моей «сессии сеанса».Это правильно?bsTst3.scala

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.sources.IsNotNull
    import org.apache.spark.serializer.KryoSerializer
    import org.apache.spark.SerializableWritable
    import java.io._

    @SerialVersionUID(0L)
    class IntWrapper extends  Serializable{
      @transient var value:Int=0 
    }

    @SerialVersionUID(1L)  
    class LongWrapper extends  Serializable{
      @transient var value:Long=0 

    }

    object bsTst3 {
      def main(args: Array[String]): Unit = {

        val spark = SparkSession.builder().appName("DrfApp")
                     .config("spark.kryo.registrator", "bsKryoRegistrator")
                    .config("spark.kryoserializer.buffer", "1024k")
                    .config("spark.kryoserializer.buffer.max", "1024m") 
                    .enableHiveSupport().getOrCreate()
        val bDRF = spark.sql("select * from global.table_partition1 limit 10")                

        import spark.implicits._

        bDRF.write.saveAsTable("katukuri.sample3")

      }
    }

и это класс scala - это то, что я вызываю из основного объекта bsKryoRegistrator.scala

    import org.apache.spark.serializer.KryoRegistrator
    import com.esotericsoftware.kryo.Kryo;
    import org.apache.hadoop.io.NullWritable

    class bsKryoRegistrator extends KryoRegistrator {

      override def registerClasses(kryo: Kryo) {
        kryo.register(classOf[Byte])
        kryo.register(classOf[Short])
        //kryo.register(classOf[Int])
        //kryo.register(classOf[Long])
        kryo.register(classOf[IntWrapper])
        kryo.register(classOf[LongWrapper])
        kryo.register(classOf[Float])
        kryo.register(classOf[Double])
        kryo.register(classOf[String])
        kryo.register(classOf[Boolean])
        kryo.register(classOf[Char])
        kryo.register(classOf[Null])
        kryo.register(classOf[Nothing])
        //kryo.register(classOf[None])
        kryo.register(classOf[NullWritable])
      }

    }

команда, которую я использую для запуска на своем кластере dev

spark2-submit --class bsTst3 --master yarn --queue root.default  --deploy-mode client BSDrdTest3-0.0.1-SNAPSHOT.jar

ошибка, которую я получаю, которая похожа на ту, что в ссылке на ошибку

    19/02/27 18:06:14 INFO spark.SparkContext: Starting job: saveAsTable at bsTst3.scala:37
19/02/27 18:06:14 INFO scheduler.DAGScheduler: Registering RDD 6 (saveAsTable at bsTst3.scala:37)
19/02/27 18:06:14 INFO scheduler.DAGScheduler: Got job 0 (saveAsTable at bsTst3.scala:37) with 1 output partitions
19/02/27 18:06:14 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (saveAsTable at bsTst3.scala:37)
19/02/27 18:06:14 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
19/02/27 18:06:14 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 0)
19/02/27 18:06:14 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[6] at saveAsTable at bsTst3.scala:37), which has no missing parents
19/02/27 18:06:14 INFO cluster.YarnScheduler: Cancelling stage 0
19/02/27 18:06:14 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (saveAsTable at bsTst3.scala:37) failed in Unknown s due to Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.unsafe.types.UTF8String$IntWrapper
Serialization stack:
        - object not serializable (class: org.apache.spark.unsafe.types.UTF8String$IntWrapper, value: org.apache.spark.unsafe.types.UTF8String$IntWrapper@63ec9c06)
        - field (class: org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToInt$1, name: result$2, type: class org.apache.spark.unsafe.types.UTF8String$IntWrapper)
...