Я получаю объект protobuf от Kafka в моем приложении Sparkstreaming, которое использует сериализатор Kryo.
Maven зависимость:
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.42</version>
</dependency>
код конфигурации:
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator","com.xylink.ai.common.KryoRegister.Meeting2EsRegister")
Класс регистратора:
class Meeting2EsRegister extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register( Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer() )
kryo.register(classOf[userStatProtobuf], new ProtobufSerializer())
}
}
информация об ошибке:
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
videoSendQuality_ (com.xylink.ai.common.userStatProtobuf$Statistics)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:244)
at org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:159)
at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:189)
at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:186)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:127)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
... 35 more
и поле представляет собой список, указанный в protobuf как показано ниже:
videoSendQuality_ = java.util.Collections.emptyList();
хорошо, это работает, когда я добавляю этот код в Meeting2EsRegister, но я не знаю, почему?
UnmodifiableCollectionsSerializer.registerSerializers(kryo)