Как я могу сделать Kryo Serializer - PullRequest
0 голосов
/ 27 апреля 2020

Я попытался решить следующую проблему, предоставив Kryo Serializer, но он все еще не работает. Он не мог распознать сериализатор ModelCom. Кроме того, любые сообщения по функции печати не отображаются.

Я использовал Apache Flink 1.9.0 и Apache Jena 3.10.0

Мой код в Kotlin:

val serializer = object : Serializer<Model>(){
            override fun write(kryo: Kryo, output: Output?, obj : Model?) {
                print("write")
                kryo.writeClassAndObject(output, obj)
            }

            override fun read(kryo: Kryo, input: Input?, type: Class<Model>?): Model {
                print("read")
                val m = kryo.readObject(input, Model::class.java)
                return m
            }

        }


ExecutionContext.see.config.registerTypeWithKryoSerializer(ModelCom::class.java, serializer::class.java)

Ошибка

Exception in thread "main" org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot serialize operator object class org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory.
    at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:222)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:460)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:272)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:243)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:243)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:207)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:159)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:94)
    at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:737)
    at org.apache.flink.optimizer.plan.StreamingPlan.getJobGraph(StreamingPlan.java:40)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:86)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
    at core.EgressEngine.start(EgressEngine.kt:187)
    at core.EgressEngineKt.main(EgressEngine.kt:45)
Caused by: java.io.NotSerializableException: org.apache.jena.rdf.model.impl.ModelCom
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
    at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:515)
    at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:219)
    ... 14 more

1 Ответ

0 голосов
/ 27 апреля 2020

Модели Jena не сериализуемы, поэтому этот подход не сработает. Вместо этого вы можете отправить только достаточно сериализованных данных, чтобы каждый экземпляр, которому нужна модель, мог создать его экземпляр.

См. этот поток из списка пользователей jena о том, как решить это для Spark; основные проблемы одинаковы для любой платформы на основе JVM, которая распределяет вычисления.

...