вопрос крио с протобуф в спарк - PullRequest
0 голосов
/ 29 августа 2018

Я получаю объект protobuf от Kafka в моем приложении Sparkstreaming, которое использует сериализатор Kryo.

Maven зависимость:

 <dependency>
        <groupId>de.javakaffee</groupId>
        <artifactId>kryo-serializers</artifactId>
        <version>0.42</version>
    </dependency>

код конфигурации:

.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator","com.xylink.ai.common.KryoRegister.Meeting2EsRegister")

Класс регистратора:

class Meeting2EsRegister extends KryoRegistrator {

  override def registerClasses(kryo: Kryo) {
    kryo.register( Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer() )
    kryo.register(classOf[userStatProtobuf], new ProtobufSerializer())
  }
}

информация об ошибке:

    com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
videoSendQuality_ (com.xylink.ai.common.userStatProtobuf$Statistics)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
    at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
    at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
    at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:244)
    at org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:159)
    at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:189)
    at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:186)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
    at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:127)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    ... 35 more

и поле представляет собой список, указанный в protobuf как показано ниже:

videoSendQuality_ = java.util.Collections.emptyList();

хорошо, это работает, когда я добавляю этот код в Meeting2EsRegister, но я не знаю, почему?

UnmodifiableCollectionsSerializer.registerSerializers(kryo)
...