Должен ли исходный соединитель MySQL для Kafka JDBC иметь MySQL Server на локальном хосте? - PullRequest
0 голосов
/ 29 января 2019

Я довольно новичок в Kafka и пытаюсь настроить и запустить простую систему соединения kafka с исходным соединителем MySQL и соединителем поискового приемника Elasticsearch + Elastic;для основных целей потока данных.

Я следую инструкциям из https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/ и его части 2 (я убедился, что ES работает, имея простого производителя на стороне источника.)

Все настроено и работает, как и ожидалось, кроме разъема источника MySQL.Виртуальная машина, на которой я пытаюсь все это , не имеет установленного сервера MySQL .Часть учебника, посвященная СУБД. Я использую клиент для создания / изменения и работы с таблицами.Поэтому в свойствах источника я попытался:

"connection.url": "jdbc:mysql://IPaddressofDB:3306/DBname?user=uname&password=pwd"
"table.whitelist": "tablename"

Чтобы запустить соединители, я просто сделал ./confluent load connector-name

Как только я загрузил соединитель источника и проверил его состояние,выдает ошибку, что

"org.apache.kafka.connect.errors.ConnectException: Failed trying to validate that columns used for offsets are NOT NULL\n\t ...
 Caused by: java.sql.SQLSyntaxErrorException: Table 'admin_portal.tablename' doesn't exist\n\t
  1. Это даже правильно?Я что-то упустил полностью?

  2. Как указать connection.url для таких случаев, как я пытаюсь: где вы пытаетесь подключиться к различным серверам БД?Кажется, что почти все проблемы с примерами / git указывают только на localhost.

  3. Я не уверен, откуда взято admin_portal, я не указал, что вообще где-либо

**** РЕДАКТИРОВАНО дляПредложения @ robin-moffat (похоже, выдают ту же ошибку, что и раньше)

sourceconfig.json:

{
        "name": "jdbc_source_mysql_new",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://ipaddress:3306/dbname?user=uname&password=pwd",
                "table.whitelist": "dbname.tablename",
                "topic.prefix": "mysql-new-",
                "mode":"incrementing",
                "incrementing.column.name": "colname"
                }
}

Загружен разъем:

>curl -X POST -H "Content-Type: application/json" --data @sourceconfig.json http://localhost:8083/connectors

Проверьте состояниеразъема:

>curl -X GET localhost:8083/connectors/jdbc_source_mysql_new/tasks/0/status

  {"state":"FAILED","
     "trace": 
     "org.apache.kafka.connect.errors.ConnectException: Failed trying to validate that columns used for offsets are NOT NULL\n\t
     at io.confluent.connect.jdbc.source.JdbcSourceTask.validateNonNullable(JdbcSourceTask.java:400)\n\t
     at io.confluent.connect.jdbc.source.JdbcSourceTask.start(JdbcSourceTask.java:156)\n\t
     at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)\n\t
     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\t
     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\t
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\t
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\t
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\t
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\t
 at java.lang.Thread.run(Thread.java:748)\n

 Caused by: java.sql.SQLSyntaxErrorException: Table 'admin_portal.tablename' doesn't exist\n\t
 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)\n\t
 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)\n\t
 at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)\n\t
 at com.mysql.cj.jdbc.StatementImpl.executeQuery(StatementImpl.java:1218)\n\t
 at com.mysql.cj.jdbc.DatabaseMetaData$7.forEach(DatabaseMetaData.java:2950)\n\t
 at com.mysql.cj.jdbc.DatabaseMetaData$7.forEach(DatabaseMetaData.java:2938)\n\t
 at com.mysql.cj.jdbc.IterateBlock.doForAll(IterateBlock.java:56)\n\t
 at com.mysql.cj.jdbc.DatabaseMetaData.getPrimaryKeys(DatabaseMetaData.java:2991)\n\t
 at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.primaryKeyColumns(GenericDatabaseDialect.java:696)\n\t
 at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:533)\n\t
 at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:513)\n\t
 at io.confluent.connect.jdbc.source.JdbcSourceTask.validateNonNullable(JdbcSourceTask.java:369)\n\t... 9 more\n",}

Ответы [ 2 ]

0 голосов
/ 30 января 2019

Это сработало после того, как я понизил версию коннектора My SQL с 8.x до 5.1.47 и поместил его в правильный $ CLASSPATH

mysql-connector-java-5.1.47.jar
0 голосов
/ 29 января 2019

Должен ли исходный соединитель MySQL для Kafka JDBC иметь MySQL Server на локальном хосте?

Нет.Он использует JDBC, который может подключаться к серверу на удаленном экземпляре.

  1. Это даже правильно?Я что-то упустил полностью?

Из того, что вы описали, вы на правильных строчках:)

Как указать connection.url для таких случаев, как я пытаюсь: где вы пытаетесь подключиться к различным серверам БД?Почти все примеры / проблемы с git и т. Д., Похоже, указывают только на localhost.

Вы можете увидеть пример здесь

Вам нужно настроить JDBCURL правильно, синтаксис для которого можно найти здесь для MySQL .

Я не уверен, откуда взялся admin_portal, я не указал, что вообще где-либо

Это будет зависеть от разрешений пользователя, с которым вы подключаетесьбаза данных.Вы должны убедиться, что у него есть доступ к таблице, из которой вы хотите прочитать данные.Вы также можете указать название таблицы, например,

"table.whitelist": "schema.tablename"
...