Я использовал KSQL для создания потока и агрегированной таблицы из этого потока.
{
"ksql":"DROP Stream IF EXISTS StreamLegacyNames; DROP Stream IF EXISTS StreamLegacy; CREATE Stream StreamLegacy (payload STRUCT<AgeYr varchar>)WITH (KAFKA_TOPIC='eip-legacy-13',VALUE_FORMAT='JSON' ); CREATE Stream StreamLegacyNames As Select payload->AgeYr Age from StreamLegacy; Create Table DimAge As SELECT Age FROM StreamLegacyNames Group By Age;",
"streamsProperties":{
"ksql.streams.auto.offset.reset":"earliest"
}
}
Какой самый простой способ экспортировать этот код в таблицу sql?Мы используем соединитель jdbc для темы, но мне неясно, сработает ли это для агрегированной таблицы KSQL (в этом примере DIMAGE).
Даже если я задаю для темы значение DIMAGE и следующее в конфигурации соединения jdbcфайл.
value.converter.schemas.enable=false
Полный файл конфигурации:
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=PASSWORD
auto.evolve=true
topics=DIMAGE
tasks.max=1
connection.user=USER
value.converter.schemas.enable=false
auto.create=true
connection.url=jdbc:sqlserver://SERVER
Я получаю следующую ошибку в соединителе.
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
Запрос KSQL через почтальона показываетформат KTABLE как
{"row":{"columns":["83"]},"errorMessage":null,"finalMessage":null}
{"row":{"columns":["74"]},"errorMessage":null,"finalMessage":null}
{"row":{"columns":["36"]},"errorMessage":null,"finalMessage":null}