Как использовать сериализацию Avro для классов дел в Scala с Flink 1.7? - PullRequest
0 голосов
/ 20 марта 2019

У нас есть задание Flink, написанное на Scala с использованием классов case (сгенерированных из файлов avsc avrohugger) для представления нашего состояния. Мы хотели бы использовать Avro для сериализации нашего состояния, чтобы миграция состояний работала при обновлении наших моделей. Мы поняли, так как Flink 1.7 Avro поддерживает сериализацию OOTB. Мы добавили модуль flink-avro в путь к классам, но при восстановлении из сохраненного снимка мы заметили, что он все еще пытается использовать сериализацию Kryo. Соответствующий фрагмент кода

case class Foo(id: String, timestamp: java.time.Instant)

val env = StreamExecutionEnvironment.getExecutionEnvironment
val conf = env.getConfig
conf.disableForceKryo()
conf.enableForceAvro()

val rawDataStream: DataStream[String] = env.addSource(MyFlinkKafkaConsumer)

val parsedDataSteam: DataStream[Foo] = rawDataStream.flatMap(new JsonParser[Foo])

// do something useful with it

env.execute("my-job")

При выполнении миграции состояния на Foo (например, путем добавления поля и развертывания задания) я вижу, что он пытается десериализоваться с использованием Kryo, что, очевидно, не удается. Как я могу убедиться, что используется сериализация Avro?

UPDATE

Обнаружено около https://issues.apache.org/jira/browse/FLINK-10897,, поэтому сериализация состояния POJO с Avro поддерживается только с версии 1.8 afaik. Я попробовал это, используя последнюю версию RC 1.8 с простым WordCount POJO, который простирается от SpecificRecord:

/** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */
import scala.annotation.switch

case class WordWithCount(var word: String, var count: Long) extends 
  org.apache.avro.specific.SpecificRecordBase {
  def this() = this("", 0L)
  def get(field$: Int): AnyRef = {
    (field$: @switch) match {
      case 0 => {
        word
      }.asInstanceOf[AnyRef]
      case 1 => {
        count
      }.asInstanceOf[AnyRef]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
  }
  def put(field$: Int, value: Any): Unit = {
    (field$: @switch) match {
      case 0 => this.word = {
        value.toString
      }.asInstanceOf[String]
      case 1 => this.count = {
        value
      }.asInstanceOf[Long]
      case _ => new org.apache.avro.AvroRuntimeException("Bad index")
    }
    ()
  }
  def getSchema: org.apache.avro.Schema = WordWithCount.SCHEMA$
}

object WordWithCount {
     val SCHEMA$ = new org.apache.avro.Schema.Parser().parse(" . 
       {\"type\":\"record\",\"name\":\"WordWithCount\",\"fields\": 
       [{\"name\":\"word\",\"type\":\"string\"}, 
       {\"name\":\"count\",\"type\":\"long\"}]}")
}

Это, однако, также не работало из коробки. Затем мы попытались определить нашу собственную информацию о типе, используя AvroTypeInfo от flink-avro, но это не удалось, потому что Avro ищет в классе свойство SCHEMA $ (SpecificData: 285) и не может использовать отражение Java для идентификации SCHEMA $ в сопутствующем объекте Scala. .

1 Ответ

0 голосов
/ 13 мая 2019

Я никогда не мог заставить себя задуматься из-за того, что поля Скалы были частными под капотом.AFAIK единственное решение состоит в том, чтобы обновить Flink для использования конструкторов avro без отражения в AvroInputFormat ( сравни ).

В крайнем случае, кроме Java, одинможет использовать авро GenericRecord, возможно, использовать avro4s для генерации их из формата Standard avrohugger (обратите внимание, что Avro4s будет генерировать свою собственную схему из сгенерированных типов Scala)

...