Если вы хотите создать конвейер данных в реальном времени, вам нужно использовать инструмент изменения данных (CDC), который может передавать изменения из MySQL. Я бы предложил Debezium , которая является распределенной платформой с открытым исходным кодом для сбора данных изменений.
Захватывающие вставки
Когда в таблицу добавляется новая запись, создается JSON, аналогичный приведенному ниже:
{
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"giorgos@abc.com"
},
"source":{
"name":"dbserver1",
"server_id":223344,
"ts_sec":1500369632,
"gtid":null,
"file":"mysql-bin.000003",
"pos":364,
"row":0,
"snapshot":null,
"thread":13,
"db":"inventory",
"table":"customers"
},
"op":"c",
"ts_ms":1500369632095
}
}
before
объект равен нулю, а after
объект содержит вновь вставленные значения. Обратите внимание, что атрибут op
равен c
, что указывает на то, что это событие CREATE.
Получение обновлений
При условии, что атрибут email
был обновлен, будет создан JSON, аналогичный приведенному ниже:
{
"payload":{
"before":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"giorgos@abc.com"
},
"after":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"newEmail@abc.com"
},
"source":{
"name":"dbserver1",
"server_id":223344,
"ts_sec":1500369929,
"gtid":null,
"file":"mysql-bin.000003",
"pos":673,
"row":0,
"snapshot":null,
"thread":13,
"db":"inventory",
"table":"customers"
},
"op":"u",
"ts_ms":1500369929464
}
}
Обратите внимание op
, что теперь u
, указывая, что это было событие ОБНОВЛЕНИЕ before
объект показывает состояние строки перед обновлением, а after
объект фиксирует текущее состояние обновленной строки.
Захват удаляет
Теперь предположим, что строка была удалена;
{
"payload":{
"before":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"newEmail@abc.com"
},
"after":null,
"source":{
"name":"dbserver1",
"server_id":223344,
"ts_sec":1500370394,
"gtid":null,
"file":"mysql-bin.000003",
"pos":1025,
"row":0,
"snapshot":null,
"thread":13,
"db":"inventory",
"table":"customers"
},
"op":"d",
"ts_ms":1500370394589
}
}
op
new равно d
, что указывает на событие DELETE. Атрибут after
будет нулевым, а объект before
содержит строку перед удалением.
Вы также можете ознакомиться с обширным учебником , представленным на их сайте.
РЕДАКТИРОВАТЬ: Пример конфигурации для базы данных MySQL
{
"name": "inventory-connector", (1)
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector", (2)
"database.hostname": "192.168.99.100", (3)
"database.port": "3306", (4)
"database.user": "debezium", (5)
"database.password": "dbz", (6)
"database.server.id": "184054", (7)
"database.server.name": "fullfillment", (8)
"database.whitelist": "inventory", (9)
"database.history.kafka.bootstrap.servers": "kafka:9092", (10)
"database.history.kafka.topic": "dbhistory.fullfillment" (11)
"include.schema.changes": "true" (12)
}
}
1 Название нашего разъема, когда мы регистрируем его в Kafka Connect.
оказание услуг.
2 Имя этого класса коннектора MySQL.
3 Адрес
сервер MySQL.
4 Номер порта сервера MySQL.
5 Имя
пользователь MySQL, имеющий необходимые привилегии.
6 Пароль для
пользователь MySQL, имеющий необходимые привилегии.
7 разъемов
идентификатор, который должен быть уникальным в кластере MySQL и похож на
Свойство конфигурации MySQL для идентификатора сервера.
8 Логическое название
MySQL сервер / кластер, который формирует пространство имен и используется во всех
названия тем Кафки, в которые пишет коннектор, Кафка
Подключите имена схем и пространства имен соответствующих Avro
схема, когда используется Avro Connector.
9 Список всех баз данных
размещенный на этом сервере, этот соединитель будет контролировать Это
необязательно, и есть другие свойства для перечисления баз данных и
таблицы для включения или исключения из мониторинга.
10 Список Кафки
брокеры, которые этот разъем будет использовать для записи и восстановления DDL
заявления в тему истории базы данных.
11 Название базы данных
тема истории, где коннектор напишет и восстановит DDL
заявления. Эта тема предназначена только для внутреннего использования и не должна использоваться
потребителями.
12 Флаг, указывающий, что разъем должен
сгенерировать в теме изменения схемы с именем события полного заполнения с помощью
Изменения DDL, которые могут быть использованы потребителями.