Как импортировать таблицы MS Sql Server в KSQL с подключением Kafka - PullRequest
0 голосов
/ 10 октября 2018

Привет. Я пытаюсь импортировать все таблицы, представленные на удаленном SQL Server, в разделы KSQL. Это свойства моего файла

connector.class=io.confluent.connect.cdc.mssql.MsSqlSourceConnector
name=sqlservertest
tasks.max=1
initial.database=$$DATABASE
connection.url=jdbc:sqlserver://$$IP:1433;databaseName=$$DATABASE;user=$$USER;
username=$$USER
password=$$PASS
server.name=$$IP
server.port=1433
topic.prefix=sqlservertest
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
mode=bulk
auto.create=true
auto.evolve=true

, чем я

confluent load sqlservertest -d /opt/kakfkaconf/sqlservertest.properties

и в журнале

confluent log connect -f

показывает

[2018-10-10 14:18:43,856] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:868)

работает правильно, но ничего не импортирует, тема остается пустой

confluent status sqlservertest 
{
  "name": "sqlservertest",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.132.0.2:8083"
  },
  "tasks": [],
  "type": "source"
}

Я также изменил свойства

name=mssql
connector.class=io.confluent.connect.cdc.mssql.MsSqlSourceConnector
tasks.max=2
initial.database=$$DB
username=$$USER
password=$$PASS
server.name=$$IP
server.port=1433
change.tracking.tables=$$SCHEMA.$$TABLE
auto.create=true
auto.evolve=true
topic.prefix=$$DB
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

но я получаю эту ошибку

[2018-10-10 15:06:09,216] ERROR Exception thrown while querying for ChangeKey{databaseName=$$DB, schemaName=$$SCHEMA, tableName=$$TABLE} (io.confluent.connect.cdc.mssql.QueryService:94)
org.apache.kafka.connect.errors.DataException: Exception thrown while getting metadata for ChangeKey{databaseName=$$DB, schemaName=$$SCHEMA, tableName=$$TABLE}
        at io.confluent.connect.cdc.CachingTableMetadataProvider.tableMetadata(CachingTableMetadataProvider.java:64)
        at io.confluent.connect.cdc.mssql.QueryService.queryTable(QueryService.java:108)
        at io.confluent.connect.cdc.mssql.QueryService.processTables(QueryService.java:92)
        at io.confluent.connect.cdc.mssql.QueryService.run(QueryService.java:67)
        at com.google.common.util.concurrent.AbstractExecutionThreadService$1$2.run(AbstractExecutionThreadService.java:60)
        at com.google.common.util.concurrent.Callables$3.run(Callables.java:95)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near '='.
        at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
        ... 6 more
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near '='.
        at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:259)
        at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1547)
        ... 11 more

1 Ответ

0 голосов
/ 25 октября 2018

Я нашел истинную причину этой ошибки, коннектор Kafka использует функции, присутствующие только в MS Sql server 2012, в частности сравнение IFF и boolan в функции

select IFF(1>2,'OK','KO');
select (1>2) as bool;

, которые НЕ работаютMS Sql 2008

Настоящая причина в том, что Conflunet MSSQL Connector создан только для MS SQL Server 2012 и выше, и я работаю в версии 2008

Я декомпилировал библиотеку kafka-connect-cdc-mssql и настроил SQL-код, чтобы он соответствовал sqlserver 2008, и теперь он работает.

Может быть, я отправлю его на github, чтобы сделать его доступным для всех

...