Копирование данных из Кафки в MySQL, не может подключиться к JDBCSinkConnector с помощью DOcker и Debezium - PullRequest
0 голосов
/ 06 ноября 2019

Привет! Я использую debezium для захвата изменений в Mongo и отправки их в mysql. Я использую следующую ссылку https://github.com/debezium/debezium-examples/tree/master/unwrap-mongodb-smt, где я заменяю конечную базу данных postgres на базу данных mysql, но я не могу этого сделать.

Это мой пересмотренный jdbc-sink.json, где я использую mysql url для подключения.

{
    "name" : "jdbc-sink",
    "config" : {
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max" : "1",
        "topics" : "customers",
        "connection.url" : "jdbc:mysql://localhost:3306/inventorydb?user=user&password=password",
        "auto.create" : "true",
        "auto.evolve" : "true",
        "insert.mode" : "upsert",
        "delete.enabled": "true",
        "pk.fields" : "id",
        "pk.mode": "record_key",
        "transforms": "mongoflatten",
        "transforms.mongoflatten.type" : "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
        "transforms.mongoflatten.drop.tombstones": "false"
    }
}

Но я получаю следующую ошибку при запуске

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json

HTTP / 1.1 500 Внутренняя ошибка сервера Дата: среда, 06 ноября 2019 г. 08:13:39 GMT Тип содержимого: application / json Длина содержимого: 3404 Сервер: Jetty (9.4.18.v20190429)

{" error_code": 500, "message": "Не удалось найти любой класс, который реализует Connector и имя которого соответствует io.confluent.connect.jdbc.JdbcSinkConnector , доступны следующие соединители: PluginDesc {klass= class io.debezium.connector.mongodb.MongoDbConnector, name = 'io.debezium.connector.mongodb.MongoDbConnector', версия = '1.0.0-SNAPSHOT', encodedVersion = 1.0.0-SNAPSHOT, тип = источник, typeName ='source', location = 'file: / kafka / connect / debezium-connector-mongodb /'}, PluginDesc {klass = class io.debezium.connector.mysql.MySqlConnector, name = 'io.debezium.connector.mysql.MySqlConnector', версия =' 1.0.0-SNAPSHOT ', encodedVersion = 1.0.0-SNAPSHOT, тип = источник, typeName =' source ', местоположение =' файл: / kafka / connect / debezium-connector-mysql / '}, PluginDesc{klass = class io.debezium.connector.oracle.OracleConnector, name = 'io.debezium.connector.oracle.OracleConnector', версия = '1.0.0-SNAPSHOT', encodedVersion = 1.0.0-SNAPSHOT, тип = источник,typeName = 'source', location = 'file: / kafka / connect / debezium-connector-oracle/ '}, PluginDesc {klass = class io.debezium.connector.postgresql.PostgresConnector, name =' io.debezium.connector.postgresql.PostgresConnector ', версия =' 1.0.0-SNAPSHOT ', кодированная версия = 1.0.0-SNAPSHOT, type = source, typeName = 'source', location = 'file: / kafka / connect / debezium-connector-postgres /'}, PluginDesc {klass = class io.debezium.connector.sqlserver.SqlServerConnector, name = 'io. debezium.connector.sqlserver.SqlServerConnector ', версия =' 1.0.0-SNAPSHOT ', encodedVersion = 1.0.0-SNAPSHOT, тип = источник, typeName =' source ', location =' файл: / kafka / connect / debezium-connector-sqlserver / '}, PluginDesc {klass = class org.apache.kafka.connect.file.FileStreamSinkConnector, name =' org.apache.kafka.connect.file.FileStreamSinkConnector ', версия =' 2.3.0 ', encodedVersion = 2.3.0, type = sink, typeName = 'sink', location = 'classpath'}, PluginDesc {klass = class org.apache.kafka.connect.file.FileStreamSourceConnector, name = 'org.apache.kafka.connect.file. FileStreamSourceConnector ', версия =' 2.3.0 ', encodedVersion = 2.3.0, тип = источник, typeName = 'source', location = 'classpath'}, PluginDesc {klass = class org.apache.kafka.connect.tools.MockConnector, name = 'org.apache.kafka.connect.tools.MockConnector', version = '2.3.0 ', encodedVersion = 2.3.0, тип = соединитель, typeName =' соединитель ', местоположение =' classpath '}, PluginDesc {klass = класс org.apache.kafka.connect.tools.MockSinkConnector, name =' org. apache.kafka.connect.tools.MockSinkConnector ', версия =' 2.3.0 ', encodedVersion = 2.3.0, тип = сток, typeName =' сток ', location =' classpath '}, PluginDesc {klass = class org.apache.kafka.connect.tools.MockSourceConnector, name = 'org.apache.kafka.connect.tools.MockSourceConnector', версия = '2.3.0', encodedVersion = 2.3.0, type = source, typeName = 'source', location= 'classpath'}, PluginDesc {klass = class org.apache.kafka.connect.tools.SchemaSourceConnector, name = 'org.apache.kafka.connect.tools.SchemaSourceConnector', версия = '2.3.0', encodedVersion = 2.3.0, type = source, typeName = 'source', location = 'classpath'}, PluginDesc {klass = class org.apache.kafka.connect.tools. VerifiableSinkConnector, name = 'org.apache.kafka.connect.tools.VerifiableSinkConnector', версия = '2.3.0', encodedVersion = 2.3.0, type = source, typeName = 'source', location = 'classpath'}, PluginDesc{klass = class org.apache.kafka.connect.tools.VerifiableSourceConnector, name = 'org.apache.kafka.connect.tools.VerifiableSourceConnector', версия = '2.3.0', encodedVersion = 2.3.0, type = source,typeName = 'source', location = 'classpath'} "}

Я понимаю, что некоторые не могут найти io.confluent.connect.jdbc.JdbcSinkConnector, но как я должен / и где я должендержи такую ​​баночку.

Спасибо

1 Ответ

0 голосов
/ 07 ноября 2019

Вы не предоставили соединитель приемника в Kafka Connect, обратите внимание, что команда docker-compose up --build -d используется для запуска создания нового образа Connect с соединителем приемника JDBC, запеченным в https://github.com/debezium/debezium-examples/blob/master/unwrap-mongodb-smt/debezium-jdbc/Dockerfile#L10

...