Как создать KSQL Stream с большим количеством полей JSON из топика в кафке? - PullRequest
0 голосов
/ 01 октября 2018

Я передаю длинную строку JSON String в тему kafka, например:

{
    "glossary": {
        "title": "example glossary",
        "GlossDiv": {
            "title": "S",
            "GlossList": {
                "GlossEntry": {
                    "ID": "SGML",
                    "SortAs": "SGML",
                    "GlossTerm": "Standard Generalized Markup Language",
                    "Acronym": "SGML",
                    "Abbrev": "ISO 8879:1986",
                    "GlossDef": {
                        "para": "A meta-markup language, used to create markup languages such as DocBook.",
                        "GlossSeeAlso": ["GML", "XML"]
                    },
                    "GlossSee": "markup"
                }
            }
        }
    }
}

и хочу создать поток из темы kafka со всеми полями без указания каждого поля в KSQL, например:

 CREATE STREAM pageviews_original (*) WITH \
(kafka_topic='pageviews', value_format='JSON');

1 Ответ

0 голосов
/ 02 октября 2018

Если вы хотите, чтобы имена полей автоматически выбирались KSQL, вам нужно использовать Avro.Если вы используете Avro, схема для данных будет зарегистрирована в реестре схемы слияния, и KSQL автоматически получит ее при использовании темы.

Если вы используете JSON, у вас есть , чтобы сообщить KSQL, что это за столбцы.Вы можете сделать это либо в операторе CREATE STREAM, используя тип данных STRUCT для вложенных элементов.

Вы можете выбрать способ обхода списка всех полей, объявив только высокоуровневые поля в CREATE STREAM, а затем получив доступ к вложенным элементам с помощью EXTRACTJSONFIELD для полей, которые вы хотите использовать.Имейте в виду, что существует проблема в 5.0.0, которая будет исправлена ​​в 5.0.1 .Также вы не можете использовать это для вложенных массивов и т. Д., Которые у вас есть в данных примера, которые вы показываете.

...