Если вы используете достаточно свежую версию ksqlDB, тогда простое цитирование имен столбцов с недопустимыми символами должно работать:
CREATE STREAM S (
beat STRUCT<
name VARCHAR,
hostname VARCHAR,
version VARCHAR
>,
log_instance VARCHAR,
type VARCHAR,
message VARCHAR, # for brevity - I also have this with a struct
`@timestamp` VARCHAR,
input_type VARCHAR,
`@version` VARCHAR )
WITH (KAFKA_TOPIC='some_topic', VALUE_FORMAT='JSON');
Если вышеперечисленное не работает, вероятно, вы используете старая версия ksqlDB. Обновление должно решить эту проблему.
PS. Как «привязать» topi c к схеме реестра?
ksqlDB автоматически опубликует sh схему JSON в реестре схем, если вы используете формат JSON_SR
, а не просто JSON
. Последний поддерживает только чтение схемы из реестра схем.
Если вас больше интересует, как вы регистрируете схему в SR для существующего topi c ... тогда вы Лучше всего посмотреть документы SR. Обратите внимание: ksqlDB поддерживает только стратегию именования TopicNameStrategy
. Схема значений имеет тему {topic-name}-value
, например, следующие регистры - это схема JSON для значений test
topi c.
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}' http://localhost:8081/subjects/test-value/versions
Дополнительную информацию см. В руководстве по SR: https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html
Я также пытался создать запись в реестре, но мне не удалось создать файл git avro таким образом.
Avro делает не допускать @
в названиях полей. Однако похоже, что ваши данные имеют формат JSON, что позволяет @
. См. Приведенный выше пример curl о том, как зарегистрировать схему JSON.