Flink Как десериализовать черту к делу - PullRequest
0 голосов
/ 21 апреля 2019

Я использую кафку Flink's (1.7.2).Как десериализовать несколько классов дел, которые расширяют ту же черту?например,

import spray.json.{DefaultJsonProtocol, RootJsonFormat}
  trait Foo
  case class Boo(name: String) extends Foo
  case class Buzz(name: String, age: Int) extends Foo
  object Formats extends DefaultJsonProtocol{
  implicit val booFormat: RootJsonFormat[Boo] = 
       jsonFormat1(Boo.apply)
  implicit val buzzFormat: RootJsonFormat[Buzz] = 
       jsonFormat2(Buzz.apply)
}

Я использую Kafka для потребителей с этим DeserializationSchema:

class FooSchema extends DeserializationSchema[Foo]{
  @transient lazy val log = LoggerFactory.getLogger(this.getClass)

  implicit val typeInfo = createTypeInformation[Foo]

  override def deserialize(bytes: Array[Byte]): Foo = {
      val foo = new String(bytes, StandardCharsets.UTF_8).parseJson
        .convertTo[Foo] //doesn't compile, I need to deserialize to Boo and Buzz
      log.debug(s"Received Boo")
      foo

  }
  override def isEndOfStream(t: Foo): Boolean = false
  override def getProducedType: TypeInformation[Foo] = createTypeInformation[Foo]
}

любая идея будет принята с благодарностью

1 Ответ

1 голос
/ 21 апреля 2019

Попробуйте spray-json-shapeless , который может автоматически выводить декодеры для ADT следующим образом:

sealed trait Foo
case class Boo(name: String) extends Foo
case class Buzz(name: String, age: Int) extends Foo

object MyFormats extends DefaultJsonProtocol with FamilyFormats {
  implicit val formats = shapeless.cachedImplicit[JsonFormat[Foo]]
}

Не забудьте сделать черту sealed. Обратите внимание, что необработанный JSON должен содержать type поле двусмысленность , чтобы rawJsonString.parseJson.convertTo[Foo] работал, например

  object Main extends App {
    import MyFormats._

    val rawJsonBuzz =
      """
        |{
        |  "name": "Picard",
        |  "age": 60,
        |  "type": "Buzz"
        |}
      """.stripMargin

    val buzz = rawJsonBuzz.parseJson.convertTo[Foo]
    println(buzz)

    val rawJsonBoo =
      """
        |{
        |  "name": "Picard",
        |  "type": "Boo"
        |}
      """.stripMargin

    val boo = rawJsonBoo.parseJson.convertTo[Foo]
    println(boo)
  }

должен вывести

Buzz(Picard,60)
Boo(Picard)
...