Флинк Авро состояние эволюция - PullRequest
0 голосов
/ 15 января 2019

У меня есть топология Flink, которая использует ListState [MyAvroClass] в CoProcessFunction.

В моей функции CoProcess я инициализирую

var myState: ListState[MyAvroClass] = _

override def open(parameters: Configuration): Unit = {
  val avroSerializer = new AvroSerializer[MyAvroClass](classOf[MyAvroClass])
  myState = getRuntimeContext.getListState[MyAvroClass](new ListStateDescriptor[MyAvroClass]("myState", avroSerializer))
}

Все работает нормально, но я не могу справиться с эволюцией схемы avro. Даже если я перезапускаю свою работу из точки сохранения, у меня возникают проблемы с десериализацией данных, например, когда я выполняю итератор через listState.

Я продолжаю иметь ошибку:

java.lang.ArrayIndexOutOfBoundsException: 738197505
    at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
    at org.apache.flink.formats.avro.typeutils.AvroSerializer.deserialize(AvroSerializer.java:201)
    at org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeNextElement(RocksDBListState.java:147)
    at org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeList(RocksDBListState.java:137)
    at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
    at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
    at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)

Кажется, он пытается получить поле, которого нет в предыдущей версии моего аврокласса. Любая идея, как я могу решить это?

...