Flink 1.9.0 - Сбой десериализации состояния после изменения объекта состояния - PullRequest
0 голосов
/ 09 апреля 2020

После изменения объекта состояния наше состояние не загружается. (не удивительно)

Наш объект состояния выглядит примерно так:

public class StateHolder {
    private Set<Object1> objects1 = new HashSet<>();
    private Set<Object2> objects2 = new HashSet<>();
    // 4 more sets of objects, Object3 to Object6 let's call them that
    // no args constructor, and getters and setters
...
}

И он используется так:

ValueStateDescriptor<StateHolder> aggregateValueStateDescriptor = new ValueStateDescriptor<>(
                getDescriptorNamePrefix(STATE_PREFIX, STATE_NAME_COMMAND_AGGREGATE, DATE_OF_STATE_CREATION),
                TypeInformation.of(new TypeHint<StateHolder>() {
                })
        );
        commandAggregateState = getRuntimeContext().getState(aggregateValueStateDescriptor);

Недавно мы добавили поле в Object3. Мы устанавливаем значение по умолчанию для него. Это строка, и она определяется как: private String newField = "";

После этого мы получаем следующее исключение:

com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
    at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)

Очевидно, что наши наборы десериализованы с использованием KryoSerializer, и я предполагаю, что было что-то ожидать Предположительно, не идеальный, хотя.

Что я не понимаю, так это почему это не так и как решить эту проблему? Таким образом, мы добавили поле в класс Object3, но десериализация не удалась в поле objects5 (Set<Object5>). Может ли новое поле вызвать неожиданное смещение буфера, чтобы регистрационный идентификатор был прочитан с неправильной позиции?

Наш POJO соответствует всем правилам, перечисленным в документации (возможно, это может быть неактуально, так как происходит сбой десериализации Kryo):

* The class is public and standalone (no non-static inner class)
* The class has a public no-argument constructor
* All non-static, non-transient fields in the class (and all superclasses) are either public (and non-final) or have a public getter- and a setter- method that follows the Java beans naming conventions for getters and setters.

При поиске исключения, были некоторые предложения, такие как реализация собственного Serializer. Согласитесь ли вы, что это правильный подход? Если так, то, покинув Google, я только узнал, как создавать собственные сериализаторы Avro и Protobuff, ни один из которых мы не используем для сохранения состояния. По-видимому, наши данные сериализуются с помощью PojoSerializer, который использует различные KryoSerializer для десериализации наших наборов, и один из них, очевидно, дает сбой. Должны ли мы создать новый пользовательский PojoSerializer с пользовательским KryoSerializer для этого ошибочного поля, и если да, то как должен выглядеть этот пользовательский KryoSerializer? Если у нас есть жестко закодированный регистрационный идентификатор, который соответствует java.util.Set?

Спасибо за чтение всего этого, и я прошу прощения, если это дублирующий вопрос. Мне не удалось найти достаточно одного подобного.

ОБНОВЛЕНИЕ

Итак, после предложения переместить инициализацию нового поля в значение по умолчанию в NoArgsConstructor, наше оригинальное исключение просто завернутый.

java.lang.RuntimeException: Error while getting state
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
at fake.package.pipeline.handler.codebook.function.CommandHandlerCodeBookFunction.initializeState(CommandHandlerCodeBookFunction.java:88)
at fake.package.pipeline.handler.Handler.open(Handler.java:104)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.open(KeyedCoProcessOperator.java:62)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: Error while trying to migrate RocksDB state.
at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.migrateSerializedValue(AbstractRocksDBState.java:213)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:603)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:532)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 11 more
Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.migrateSerializedValue(AbstractRocksDBState.java:210)
... 21 more
...