Я довольно новичок в Kafka и пытаюсь настроить и запустить простую систему соединения kafka с исходным соединителем MySQL и соединителем поискового приемника Elasticsearch + Elastic;для основных целей потока данных.
Я следую инструкциям из https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/ и его части 2 (я убедился, что ES работает, имея простого производителя на стороне источника.)
Все настроено и работает, как и ожидалось, кроме разъема источника MySQL.Виртуальная машина, на которой я пытаюсь все это , не имеет установленного сервера MySQL .Часть учебника, посвященная СУБД. Я использую клиент для создания / изменения и работы с таблицами.Поэтому в свойствах источника я попытался:
"connection.url": "jdbc:mysql://IPaddressofDB:3306/DBname?user=uname&password=pwd"
"table.whitelist": "tablename"
Чтобы запустить соединители, я просто сделал ./confluent load connector-name
Как только я загрузил соединитель источника и проверил его состояние,выдает ошибку, что
"org.apache.kafka.connect.errors.ConnectException: Failed trying to validate that columns used for offsets are NOT NULL\n\t ...
Caused by: java.sql.SQLSyntaxErrorException: Table 'admin_portal.tablename' doesn't exist\n\t
Это даже правильно?Я что-то упустил полностью?
Как указать connection.url для таких случаев, как я пытаюсь: где вы пытаетесь подключиться к различным серверам БД?Кажется, что почти все проблемы с примерами / git указывают только на localhost.
Я не уверен, откуда взято admin_portal
, я не указал, что вообще где-либо
**** РЕДАКТИРОВАНО дляПредложения @ robin-moffat (похоже, выдают ту же ошибку, что и раньше)
sourceconfig.json:
{
"name": "jdbc_source_mysql_new",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://ipaddress:3306/dbname?user=uname&password=pwd",
"table.whitelist": "dbname.tablename",
"topic.prefix": "mysql-new-",
"mode":"incrementing",
"incrementing.column.name": "colname"
}
}
Загружен разъем:
>curl -X POST -H "Content-Type: application/json" --data @sourceconfig.json http://localhost:8083/connectors
Проверьте состояниеразъема:
>curl -X GET localhost:8083/connectors/jdbc_source_mysql_new/tasks/0/status
{"state":"FAILED","
"trace":
"org.apache.kafka.connect.errors.ConnectException: Failed trying to validate that columns used for offsets are NOT NULL\n\t
at io.confluent.connect.jdbc.source.JdbcSourceTask.validateNonNullable(JdbcSourceTask.java:400)\n\t
at io.confluent.connect.jdbc.source.JdbcSourceTask.start(JdbcSourceTask.java:156)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\t
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\t
at java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\t
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\t
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\t
at java.lang.Thread.run(Thread.java:748)\n
Caused by: java.sql.SQLSyntaxErrorException: Table 'admin_portal.tablename' doesn't exist\n\t
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)\n\t
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)\n\t
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)\n\t
at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1218)\n\t
at com.mysql.cj.jdbc.DatabaseMetaData$7.forEach(DatabaseMetaData.java:2950)\n\t
at com.mysql.cj.jdbc.DatabaseMetaData$7.forEach(DatabaseMetaData.java:2938)\n\t
at com.mysql.cj.jdbc.IterateBlock.doForAll(IterateBlock.java:56)\n\t
at com.mysql.cj.jdbc.DatabaseMetaData.getPrimaryKeys(DatabaseMetaData.java:2991)\n\t
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.primaryKeyColumns(GenericDatabaseDialect.java:696)\n\t
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:533)\n\t
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:513)\n\t
at io.confluent.connect.jdbc.source.JdbcSourceTask.validateNonNullable(JdbcSourceTask.java:369)\n\t... 9 more\n",}