Как мне определить тип поля, начинающегося с @? - PullRequest
0 голосов
/ 14 июля 2020

Я пытаюсь создать поток из некоторых сообщений kakfa в формате json, например:

"beat": {
        "name": "xxxxxxx",
        "hostname": "xxxxxxxxxx",
        "version": "zzzzz"
    },
"log_instance": "forwarder-2",
"type": "prod",
"message": "{ ... json string.... }",
"@timestamp": "2020-06-14T23:31:33.925Z",
"input_type": "log",
"@version": "1"
}

Я пытался использовать

CREATE STREAM  S (
    beat STRUCT<
        name VARCHAR,
        hostname VARCHAR,
        version VARCHAR
    >,
    log_instance VARCHAR,
    type VARCHAR,
    message VARCHAR,   # for brevity - I also have this with a struct
    @timestamp VARCHAR,
    input_type VARCHAR,
    @version` VARCHAR )
WITH (KAFKA_TOPIC='some_topic', VALUE_FORMAT='JSON'); 

Однако получаю сообщение об ошибке:

Caused by: line 10:5: extraneous input '@' expecting ....

Я связал цитату и предшествующее подчеркивание, но не повезло. Я также попытался создать запись в реестре, но мне не удалось создать таким образом файл git avro.

PS. Как "привязать" topi c к схеме реестра?

Спасибо.

1 Ответ

0 голосов
/ 29 июля 2020

Если вы используете достаточно свежую версию ksqlDB, тогда простое цитирование имен столбцов с недопустимыми символами должно работать:

CREATE STREAM  S (
    beat STRUCT<
        name VARCHAR,
        hostname VARCHAR,
        version VARCHAR
    >,
    log_instance VARCHAR,
    type VARCHAR,
    message VARCHAR,   # for brevity - I also have this with a struct
    `@timestamp` VARCHAR,
    input_type VARCHAR,
    `@version` VARCHAR )
WITH (KAFKA_TOPIC='some_topic', VALUE_FORMAT='JSON'); 

Если вышеперечисленное не работает, вероятно, вы используете старая версия ksqlDB. Обновление должно решить эту проблему.

PS. Как «привязать» topi c к схеме реестра?

ksqlDB автоматически опубликует sh схему JSON в реестре схем, если вы используете формат JSON_SR , а не просто JSON. Последний поддерживает только чтение схемы из реестра схем.

Если вас больше интересует, как вы регистрируете схему в SR для существующего topi c ... тогда вы Лучше всего посмотреть документы SR. Обратите внимание: ksqlDB поддерживает только стратегию именования TopicNameStrategy . Схема значений имеет тему {topic-name}-value, например, следующие регистры - это схема JSON для значений test topi c.

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' http://localhost:8081/subjects/test-value/versions

Дополнительную информацию см. В руководстве по SR: https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html

Я также пытался создать запись в реестре, но мне не удалось создать файл git avro таким образом.

Avro делает не допускать @ в названиях полей. Однако похоже, что ваши данные имеют формат JSON, что позволяет @. См. Приведенный выше пример curl о том, как зарегистрировать схему JSON.

...