Как проще всего синхронизировать Kafka KTable с базой данных SQL? - PullRequest
0 голосов
/ 03 января 2019

Я использовал KSQL для создания потока и агрегированной таблицы из этого потока.

  {  
   "ksql":"DROP Stream IF EXISTS StreamLegacyNames; DROP Stream IF EXISTS StreamLegacy; CREATE Stream  StreamLegacy (payload  STRUCT<AgeYr  varchar>)WITH (KAFKA_TOPIC='eip-legacy-13',VALUE_FORMAT='JSON' );  CREATE Stream  StreamLegacyNames As Select payload->AgeYr Age from StreamLegacy; Create Table DimAge As SELECT Age FROM StreamLegacyNames Group By Age;",
   "streamsProperties":{  
      "ksql.streams.auto.offset.reset":"earliest"
   }
}

Какой самый простой способ экспортировать этот код в таблицу sql?Мы используем соединитель jdbc для темы, но мне неясно, сработает ли это для агрегированной таблицы KSQL (в этом примере DIMAGE).

Даже если я задаю для темы значение DIMAGE и следующее в конфигурации соединения jdbcфайл.

value.converter.schemas.enable=false

Полный файл конфигурации:

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=PASSWORD
auto.evolve=true
topics=DIMAGE
tasks.max=1
connection.user=USER
value.converter.schemas.enable=false
auto.create=true
connection.url=jdbc:sqlserver://SERVER

Я получаю следующую ошибку в соединителе.

Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Запрос KSQL через почтальона показываетформат KTABLE как

{"row":{"columns":["83"]},"errorMessage":null,"finalMessage":null}
{"row":{"columns":["74"]},"errorMessage":null,"finalMessage":null}
{"row":{"columns":["36"]},"errorMessage":null,"finalMessage":null}

Ответы [ 2 ]

0 голосов
/ 04 января 2019

Когда вы CREATE STREAM foo AS SELECT («CSAS») в KSQL, вы создаете новую тему Kafka и постоянно заполняете ее результатами оператора SELECT.

Таким образом, у вас есть тема Kafka, в вашем случае она называется STREAMLEGACYNAMES (KSQL обычно переводит объекты в верхний регистр). Вы можете использовать Соединитель JDBC Sink для потоковой передачи этой темы в целевую СУБД, включая MS SQL.

0 голосов
/ 03 января 2019

KTable - это просто еще одна тема в конце дня. Вы можете использовать KSQL PRINT или kafka-console-consumer, чтобы посмотреть, какие данные будет получать соединитель JDBC Sink.

Если вы предполагаете, что таблица KSQL будет точно соответствовать таблице SQL Server, то это не так. В таблице SQL Server у вас будет каждая и всякая «строка события», произошедшая в KTable, включая нулевые значения, поскольку удаления еще не поддерживаются приемником JDBC.


Не уверен, какие данные вы ожидаете, но то, что вы могли бы сделать, это выполнить оконный вывод событий, которые вы пытаетесь захватить, тогда у вас фактически есть микропакетная вставка в вашу базу данных downsteam.

...