У нас есть задание Flink, написанное на Scala с использованием классов case (сгенерированных из файлов avsc avrohugger) для представления нашего состояния. Мы хотели бы использовать Avro для сериализации нашего состояния, чтобы миграция состояний работала при обновлении наших моделей. Мы поняли, так как Flink 1.7 Avro поддерживает сериализацию OOTB. Мы добавили модуль flink-avro в путь к классам, но при восстановлении из сохраненного снимка мы заметили, что он все еще пытается использовать сериализацию Kryo. Соответствующий фрагмент кода
case class Foo(id: String, timestamp: java.time.Instant)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val conf = env.getConfig
conf.disableForceKryo()
conf.enableForceAvro()
val rawDataStream: DataStream[String] = env.addSource(MyFlinkKafkaConsumer)
val parsedDataSteam: DataStream[Foo] = rawDataStream.flatMap(new JsonParser[Foo])
// do something useful with it
env.execute("my-job")
При выполнении миграции состояния на Foo
(например, путем добавления поля и развертывания задания) я вижу, что он пытается десериализоваться с использованием Kryo, что, очевидно, не удается. Как я могу убедиться, что используется сериализация Avro?
UPDATE
Обнаружено около https://issues.apache.org/jira/browse/FLINK-10897,, поэтому сериализация состояния POJO с Avro поддерживается только с версии 1.8 afaik. Я попробовал это, используя последнюю версию RC 1.8 с простым WordCount POJO, который простирается от SpecificRecord:
/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
import scala.annotation.switch
case class WordWithCount(var word: String, var count: Long) extends
org.apache.avro.specific.SpecificRecordBase {
def this() = this("", 0L)
def get(field$: Int): AnyRef = {
(field$: @switch) match {
case 0 => {
word
}.asInstanceOf[AnyRef]
case 1 => {
count
}.asInstanceOf[AnyRef]
case _ => new org.apache.avro.AvroRuntimeException("Bad index")
}
}
def put(field$: Int, value: Any): Unit = {
(field$: @switch) match {
case 0 => this.word = {
value.toString
}.asInstanceOf[String]
case 1 => this.count = {
value
}.asInstanceOf[Long]
case _ => new org.apache.avro.AvroRuntimeException("Bad index")
}
()
}
def getSchema: org.apache.avro.Schema = WordWithCount.SCHEMA$
}
object WordWithCount {
val SCHEMA$ = new org.apache.avro.Schema.Parser().parse(" .
{\"type\":\"record\",\"name\":\"WordWithCount\",\"fields\":
[{\"name\":\"word\",\"type\":\"string\"},
{\"name\":\"count\",\"type\":\"long\"}]}")
}
Это, однако, также не работало из коробки. Затем мы попытались определить нашу собственную информацию о типе, используя AvroTypeInfo от flink-avro, но это не удалось, потому что Avro ищет в классе свойство SCHEMA $ (SpecificData: 285) и не может использовать отражение Java для идентификации SCHEMA $ в сопутствующем объекте Scala. .