Цель : использовать дебезиум для захвата изменений из Cloud SQL.Экземпляр Cloud SQL включен по протоколу SSL в соответствии с инструкциями здесь
Сценарий : у меня есть debezium connect, kafka и zookpeer, работающие в качестве док-контейнеров на моей локальной машине.Я проверил установку на экземпляре Cloud SQL без SSL.Вещи работают.Как только я включаю SSL, преобразую файлы pem (server-ca.pem
, client-cert.pem
, client-key.pem
) в хранилище ключей и хранилище доверенных сертификатов, монтирую их как файлы в контейнере докера подключения debezium, я получаю сообщение об ошибке в журналах контейнера Debezium как таковое (послеотправка запроса POST в конечную точку):
org.apache.kafka.connect.errors.ConnectException: Error reading MySQL variables: Access denied for user 'user'@'redacted_my_ip' (using password: YES)
at io.debezium.connector.mysql.MySqlJdbcContext.querySystemVariables(MySqlJdbcContext.java:297)
at io.debezium.connector.mysql.MySqlJdbcContext.readMySqlSystemVariables(MySqlJdbcContext.java:278)
at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:81)
at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:53)
at io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:331)
at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:136)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: Access denied for user 'user'@'redacted_my_ip' (using password: YES)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:835)
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:455)
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:240)
at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:207)
at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:179)
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:756)
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:751)
at io.debezium.jdbc.JdbcConnection.connect(JdbcConnection.java:298)
at io.debezium.connector.mysql.MySqlJdbcContext.querySystemVariables(MySqlJdbcContext.java:284)
... 14 more
Мой предварительный анализ : я просматривал журналы TRACE и исходный код.Согласно журналам, он может успешно проверить соединение
2019-04-10 10:11:45,777 INFO || Successfully tested connection for jdbc:mysql://redacted_ip:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=true&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL with user 'user' [io.debezium.connector.mysql.MySqlConnector]
Этот журнал генерируется сразу после строка 101
Здесь мы выполняем запрос к базе данных, который только при успешном выполнении пропустит поток управления в журнал.Это, по моему мнению, означает, что debezium connect может подключаться к базе данных, но не работает в каком-то другом месте.Согласно трассировке стека, debezium не может подключиться второй раз, здесь
Полезная нагрузка, которую я отправляю, выглядит следующим образом:
{
"name": "connector-test",
"config": {
"connector.class": "MySql",
"tasks.max": "1",
"database.hostname": "redacted_ip",
"database.port": "3306",
"database.user": "user",
"database.password": "redacted_user_password",
"database.server.name": "dbserver1",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory",
"database.ssl.mode": "required",
"database.ssl.keystore": "./keystore",
"database.ssl.keystore.password": "redacted_keystore_password",
"database.ssl.truststore": "./truststore",
"database.ssl.truststore.password": "redacted_truststore_password"
}
}
Какие шаги необходимыдля того, чтобы вышеприведенная настройка работала