MySQL для приема данных Enterprise Kafka - PullRequest
0 голосов
/ 10 ноября 2018

У нас есть 3-узелный корпоративный кластер kafka (linux on-prem), и на одном узле запущена служба подключения kafka. Мы хотим включить данные в тему kafka, используя mysql.

Пробовал следующий материал -

1.Установил mysql на мой локальный рабочий стол Windows, создал базу данных, таблицу и вставил в нее некоторые данные.

2.Создал файл source-quickstart-mysql.properties с подробностями ниже

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://<IPAddressOfLocalMachine>:3306/test_db?user=root&password=pwd
tables.whitelist=emp
mode=incrementing
incrementing.column.name=empid
topic.prefix=test-mysql-jdbc-

У connect-standalone.properties есть эта информация:

bootstrap.servers=IPaddressOfKCnode:9092
plugin.path=/usr/share/java
  1. Перезапущена служба подключения kafka

  2. Пытался отправить сервису kafka connect запрос на подключение к моему sql -

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" IPaddressOfKCnode:8083/connectors/ -d '{"name": "emp-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://IPaddressOfLocalMachine:3306/test_db?user=root&password=pwd","table.whitelist": "emp","mode": "timestamp","topic.prefix": "mysql-" } }'

Получение следующей ошибки здесь:

{"error_code":400,"message":"Connector configuration is invalid and contains the following 2 error(s):\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd for configuration Couldn't open connection to jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=admin for configuration Couldn't open connection to jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}

Я также попробовал следующие вещи, как-

a. Перестал работать сервис kafka connect и запускался вручную -

systemctl stop confluent-kafka-connect

б. Запустил соединение вот так

/usr/bin/connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka-connect-jdbc/source-quickstart-mysql.properties

Этот процесс успешно начинается в начале, но через некоторое время умирает. Вот журналы:

[2018-11-10 19:42:53,027] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
[2018-11-10 19:42:53,048] INFO AbstractConfig values:
        batch.max.rows = 100
        catalog.pattern = null
        connection.attempts = 3
        connection.backoff.ms = 10000
        connection.password = null
        connection.url = jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd
        connection.user = null
        dialect.name =
        incrementing.column.name = empid
        mode = incrementing
        numeric.mapping = null
        numeric.precision.mapping = false
        poll.interval.ms = 5000
        query =
        schema.pattern = null
        table.blacklist = []
        table.poll.interval.ms = 60000
        table.types = [TABLE]
        table.whitelist = []
        timestamp.column.name = []
        timestamp.delay.interval.ms = 0
        topic.prefix = test-mysql-jdbc-
        validate.non.null = true
 (org.apache.kafka.common.config.AbstractConfig:279)
[2018-11-10 19:45:00,439] INFO AbstractConfig values:
        batch.max.rows = 100
        catalog.pattern = null
        connection.attempts = 3
        connection.backoff.ms = 10000
        connection.password = null
        connection.url = jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=admin
        connection.user = null
        dialect.name =
        incrementing.column.name = empid
        mode = incrementing
        numeric.mapping = null
        numeric.precision.mapping = false
        poll.interval.ms = 5000
        query =
        schema.pattern = null
        table.blacklist = []
        table.poll.interval.ms = 60000
        table.types = [TABLE]
        table.whitelist = []
        timestamp.column.name = []
        timestamp.delay.interval.ms = 0
        topic.prefix = test-mysql-jdbc-
        validate.non.null = true
 (org.apache.kafka.common.config.AbstractConfig:279)
[2018-11-10 19:47:07,666] ERROR Failed to create job for /etc/kafka-connect-jdbc/source-quickstart-mysql.properties (org.apache.kafka.connect.cli.ConnectStandalone:102)
[2018-11-10 19:47:07,668] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:113)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
        at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
        at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:110)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
        at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:415)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:189)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
[2018-11-10 19:47:07,669] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)

Я не смог найти гладкую и правильную документацию на смежном веб-сайте, чтобы использовать сервис Kafka Connect для его различных разъемов, конфигураций и т. Д. Пожалуйста, помогите получить правильные шаги для реализации конвейера приема данных: mySQL - kafkaconnect - kafka

В конце концов я ожидаю, что вставки в таблицу mysql произведут данные в теме kafka и потребителю kafka для отображения этих записей. Этот прием выглядит простым, но я упускаю некоторые основные свойства соединения: (

Спасибо!

Ответы [ 3 ]

0 голосов
/ 12 ноября 2018

В первом случае ваша ошибка была возвращена вам из вывода команды curl:

Connector configuration is invalid and contains the following 2 error(s) java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd

Итак, вы пропустили драйвер JDBC для MySQL из вашего пути Kafka Connect.


Вторая ошибка в опубликованном вами выводе:

Connector configuration is invalid and contains the following 2 error(s): Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure. The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin

это говорит о проблеме с подключением Kafka Connect к вашей машине MySQL.

Откуда вы запускаете Confluent Platform, в Docker, на машине, локальной для MySQL и т. Д.? * 192.168.178.14 - это адрес вашего сервера MySQL, и можно ли его получить с хоста, на котором работает Kafka Connect?


Вы можете найти несколько примеров настройки MySQL с помощью Kafka:

О плюсах и минусах коннектора JDBC и CDC на основе журнала см. https://www.confluent.io/blog/no-more-silos-how-to-integrate-your-databases-with-apache-kafka-and-cdc.

Отказ от ответственности: я написал вышеупомянутые сообщения в блоге.

0 голосов
/ 15 ноября 2018

Спасибо, Робин и Гиоргос, за ваши ответы! Это очень помогло. Эта проблема была связана с несколькими вещами - 1. Отсутствует JAR-коннектор MySQL JDBC. Мы должны поместить MySQL Connector / J 8.0.13 в /usr/share/java/kafka-connect-jdbc/.

2. Проблема с подключением была вызвана тем, что пользователь MySQL, к которому пытался подключиться kafka, не имел привилегий для подключения к службе удаленного подключения. Для этого я создал нового пользователя mySQL с полным разрешением и доступом к удаленному серверу (Kafka connect).

После выполнения описанных выше шагов перезапустите kafka-connect, и конвейер приема пищи начал работать.

0 голосов
/ 11 ноября 2018

Кажется, проблема с разъемом JDBC. Какую версию MySQL вы используете? Чтобы решить проблему, вам необходимо:

  1. Загрузите Connector / J 8.0.13 , если вы используете MySQL 8 или 5.1.47 для более старых версий.
  2. Поместите файл в банку под /usr/share/java/kafka-connect-jdbc/.
  3. Перезапустите Kafka Connect и запустите ваш MySQL-коннектор.
...