FlinkSQL не может зарегистрировать динамическую вложенную схему - PullRequest
0 голосов
/ 13 января 2019

Описание: когда я получил сообщение кафки, это выглядит так:

{
"name":"xiaoming",
"skill":[{"language1":"java"},{"language2":"scala"}]
}   

Итак, как я могу определить вложенную схему для поля с именем «skill», которое я могу использовать flinksql, следующим образом:

select skill.language1 from tableName

Следующие аннотации кода доступны для справки

tableEnv.connect
(
  newKafka().version("0.11").topic("clicks")
 .property("zookeeper.connect", "localhost")
 .property("group.id", "click- group")
 .startFromEarliest())
 .withFormat(new Json().jsonSchema("{...}").failOnMissingField(false)
)
.withSchema(new Schema()
       .field("user-name", "VARCHAR").from("u_name")
       .field("count", "DECIMAL")
       .field("proc-time", "TIMESTAMP").proctime())
.inAppendMode()
.registerSource("MyTable")
...