Описание:
когда я получил сообщение кафки, это выглядит так:
{
"name":"xiaoming",
"skill":[{"language1":"java"},{"language2":"scala"}]
}
Итак, как я могу определить вложенную схему для поля с именем «skill», которое я могу использовать flinksql, следующим образом:
select skill.language1 from tableName
Следующие аннотации кода доступны для справки
tableEnv.connect
(
newKafka().version("0.11").topic("clicks")
.property("zookeeper.connect", "localhost")
.property("group.id", "click- group")
.startFromEarliest())
.withFormat(new Json().jsonSchema("{...}").failOnMissingField(false)
)
.withSchema(new Schema()
.field("user-name", "VARCHAR").from("u_name")
.field("count", "DECIMAL")
.field("proc-time", "TIMESTAMP").proctime())
.inAppendMode()
.registerSource("MyTable")