Я использую debezium для получения событий данных и kafka-connect-jdb c для внесения изменений
Это мой файл Docker, который я использую mysql-connector-java-5.1.39.jar
FROM debezium/connect:1.1
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
ENV MYSQL_DRIVER_VERSION 5.1.39
ARG KAFKA_JDBC_VERSION=5.3.1
RUN curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-${MYSQL_DRIVER_VERSION}.tar.gz" \
| tar -xzf - -C /kafka/libs --strip-components=1 mysql-connector-java-5.1.39/mysql-connector-java-${MYSQL_DRIVER_VERSION}-bin.jar
RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
Это мой источник mysql db:
docker run -it --rm --name mysqltes -p 3308:3306 \
-e MYSQL_ROOT_PASSWORD=debezium \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw
debezium/example-mysql:1.1
Это мой второй mysql db, в который должны быть внесены изменения:
docker run -it --rm --name mysql -p 3307:3306 \
-e MYSQL_ROOT_PASSWORD=debezium \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw \
debezium/example-mysql:1.1
Это конфигурация моего источника mysql:
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" 192.168.99.102:8083/connectors/ \
-d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysqltes",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory"
}
}'
Это моя конфигурация коннектора приемника
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" 192.168.99.102:8083/connectors/ \
-d '{
"name": "inventory-connector-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://mysql:3306/inventory?useSSL=false",
"connection.user": "debezium",
"connection.password": "dbz",
"topics": "dbserver1.inventory.customers",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
}
}'
, и я столкнулся с этой проблемой, я не знаю, зачем мне помогать с этой проблемой
2020-06-20 13:36:58,299 INFO || Attempting to open connection #1 to MySql [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2020-06-20 13:36:58,352 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter]
2020-06-20 13:36:58,415 INFO || Checking MySql dialect for existence of table "dbserver1"."inventory"."customers" [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
2020-06-20 13:36:58,423 INFO || Using MySql dialect table "dbserver1"."inventory"."customers" absent [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
2020-06-20 13:36:58,428 INFO || Creating table with sql: CREATE TABLE `dbserver1`.`inventory`.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name` VARCHAR(256) NOT NULL,
`email` VARCHAR(256) NOT NULL,
PRIMARY KEY(`id`)) [io.confluent.connect.jdbc.sink.DbStructure]
2020-06-20 13:36:58,443 WARN || Create failed, will attempt amend if table already exists [io.confluent.connect.jdbc.sink.DbStructure]
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name' at line 1
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
at com.mysql.jdbc.Util.getInstance(Util.java:387)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:942)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3966)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3902)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2526)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2545)
at com.mysql.jdbc.StatementImpl.executeUpdateInternal(StatementImpl.java:1540)
at com.mysql.jdbc.StatementImpl.executeLargeUpdate(StatementImpl.java:2595)
at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1468)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.applyDdlStatements(GenericDatabaseDialect.java:1076)
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:93)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:121)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2020-06-20 13:36:58,508 WARN || Write of 4 records failed, remainingRetries=7 [io.confluent.connect.jdbc.sink.JdbcSinkTask]
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name' at line 1
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
at com.mysql.jdbc.Util.getInstance(Util.java:387)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:942)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3966)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3902)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2526)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2545)
at com.mysql.jdbc.StatementImpl.executeUpdateInternal(StatementImpl.java:1540)
at com.mysql.jdbc.StatementImpl.executeLargeUpdate(StatementImpl.java:2595)
at com.mysql.jdbc.StatementImpl.executeUpdate(StatementImpl.java:1468)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.applyDdlStatements(GenericDatabaseDialect.java:1076)
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:93)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:121)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2020-06-20 13:36:58,562 INFO || Closing connection #1 to MySql [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2020-06-20 13:36:58,582 INFO || Initializing writer using SQL dialect: MySqlDatabaseDialect [io.confluent.connect.jdbc.sink.JdbcSinkTask]
2020-06-20 13:36:58,584 ERROR || WorkerSinkTask{id=inventory-connector-sink-0} RetriableException from SinkTask: [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name' at line 1
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '.`customers` (
`last_name` VARCHAR(256) NOT NULL,
`id` INT NOT NULL,
`first_name' at line 1
... 12 more