У меня есть топология Flink, которая использует ListState [MyAvroClass] в CoProcessFunction.
В моей функции CoProcess я инициализирую
var myState: ListState[MyAvroClass] = _
override def open(parameters: Configuration): Unit = {
val avroSerializer = new AvroSerializer[MyAvroClass](classOf[MyAvroClass])
myState = getRuntimeContext.getListState[MyAvroClass](new ListStateDescriptor[MyAvroClass]("myState", avroSerializer))
}
Все работает нормально, но я не могу справиться с эволюцией схемы avro. Даже если я перезапускаю свою работу из точки сохранения, у меня возникают проблемы с десериализацией данных, например, когда я выполняю итератор через listState.
Я продолжаю иметь ошибку:
java.lang.ArrayIndexOutOfBoundsException: 738197505
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.flink.formats.avro.typeutils.AvroSerializer.deserialize(AvroSerializer.java:201)
at org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeNextElement(RocksDBListState.java:147)
at org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeList(RocksDBListState.java:137)
at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)
Кажется, он пытается получить поле, которого нет в предыдущей версии моего аврокласса. Любая идея, как я могу решить это?