невозможно принять изменения до mysql, используя столкновение с проблемой com. mysql .jdb c .exceptions.jdbc4.MySQLSyntaxErrorException: у вас есть ошибка в синтаксисе SQL; - PullRequest
0 голосов
/ 20 июня 2020

Я использую для получения событий данных и kafka-connect-jdb c для внесения изменений

Это мой файл Docker, который я использую mysql-connector-java-5.1.39.jar

FROM debezium/connect:1.1
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc

ENV MYSQL_DRIVER_VERSION 5.1.39

ARG KAFKA_JDBC_VERSION=5.3.1

RUN curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-${MYSQL_DRIVER_VERSION}.tar.gz" \
    | tar -xzf - -C /kafka/libs --strip-components=1 mysql-connector-java-5.1.39/mysql-connector-java-${MYSQL_DRIVER_VERSION}-bin.jar

RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
    curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar 

Это мой источник mysql db:

docker run -it --rm --name mysqltes -p 3308:3306 \
  -e MYSQL_ROOT_PASSWORD=debezium \
  -e MYSQL_USER=mysqluser \
  -e MYSQL_PASSWORD=mysqlpw 
debezium/example-mysql:1.1

Это мой второй mysql db, в который должны быть внесены изменения:

docker run -it --rm --name mysql -p 3307:3306 \
  -e MYSQL_ROOT_PASSWORD=debezium \
  -e MYSQL_USER=mysqluser \
  -e MYSQL_PASSWORD=mysqlpw \
  debezium/example-mysql:1.1

Это конфигурация моего источника mysql:

curl -i -X POST -H "Accept:application/json" \
  -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ \
  -d '{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysqltes",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "dbhistory.inventory"
  }
}'

Это моя конфигурация коннектора приемника

curl -i -X POST -H "Accept:application/json" \
  -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ \
  -d '{
  "name": "inventory-connector-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://mysql:3306/inventory?useSSL=false",
    "connection.user": "debezium",
    "connection.password": "dbz",
    "topics": "dbserver1.inventory.customers",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.fields": "id",
    "pk.mode": "record_value",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
  }
}'

, и я столкнулся с этой проблемой, я не знаю, зачем мне помогать с этой проблемой

2020-06-20 13:36:58,299 INFO   ||  Attempting to open connection #1 to MySql   [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2020-06-20 13:36:58,352 INFO   ||  JdbcDbWriter Connected   [io.confluent.connect.jdbc.sink.JdbcDbWriter]
2020-06-20 13:36:58,415 INFO   ||  Checking MySql dialect for existence of table "dbserver1"."inventory"."customers"   [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
2020-06-20 13:36:58,423 INFO   ||  Using MySql dialect table "dbserver1"."inventory"."customers" absent   [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
2020-06-20 13:36:58,428 INFO   ||  Creating table with sql: CREATE TABLE `dbserver1`.`inventory`.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name` VARCHAR(256) NOT NULL,
`email` VARCHAR(256) NOT NULL,
PRIMARY KEY(`id`))   [io.confluent.connect.jdbc.sink.DbStructure]
2020-06-20 13:36:58,443 WARN   ||  Create failed, will attempt amend if table already exists   [io.confluent.connect.jdbc.sink.DbStructure]
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name' at line 1
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
        at com.mysql.jdbc.Util.getInstance(Util.java:387)
        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:942)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3966)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3902)
        at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2526)
        at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673)
        at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2545)
        at com.mysql.jdbc.StatementImpl.executeUpdateInternal(StatementImpl.java:1540)
        at com.mysql.jdbc.StatementImpl.executeLargeUpdate(StatementImpl.java:2595)
        at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1468)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.applyDdlStatements(GenericDatabaseDialect.java:1076)
        at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:93)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:121)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
2020-06-20 13:36:58,508 WARN   ||  Write of 4 records failed, remainingRetries=7   [io.confluent.connect.jdbc.sink.JdbcSinkTask]
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name' at line 1
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
        at com.mysql.jdbc.Util.getInstance(Util.java:387)
        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:942)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3966)
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3902)
        at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2526)
        at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673)
        at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2545)
        at com.mysql.jdbc.StatementImpl.executeUpdateInternal(StatementImpl.java:1540)
        at com.mysql.jdbc.StatementImpl.executeLargeUpdate(StatementImpl.java:2595)
        at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1468)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.applyDdlStatements(GenericDatabaseDialect.java:1076)
        at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:93)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:121)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
2020-06-20 13:36:58,562 INFO   ||  Closing connection #1 to MySql   [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2020-06-20 13:36:58,582 INFO   ||  Initializing writer using SQL dialect: MySqlDatabaseDialect   [io.confluent.connect.jdbc.sink.JdbcSinkTask]
2020-06-20 13:36:58,584 ERROR  ||  WorkerSinkTask{id=inventory-connector-sink-0} RetriableException from SinkTask:   [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name' at line 1

        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name' at line 1

        ... 12 more

1 Ответ

0 голосов
/ 21 июня 2020

Попробуйте добавить свойство "quote.sql.identifiers": "never" в конфигурацию соединителя приемника. В этом случае оператор CREATE TABLE должен быть:

CREATE TABLE dbserver1.inventory.customers (
  last_name VARCHAR(256) NOT NULL,
  id INT NOT NULL,
  first_name VARCHAR(256) NOT NULL,
  email VARCHAR(256) NOT NULL,
  PRIMARY KEY(id)
)

PS Это похоже на обходной путь и, возможно, есть ошибка в коннекторе JDB C.

...