Я пытаюсь создать контрольный журнал для базы данных mysql, используя kafka, система должна обнаружить любые изменения в исходной базе данных и в ответ вставляет запись в целевую базу данных в соответствующую таблицу, создавая контрольный журнал для этого конкретного c таблица. В настоящее время я использую Debezium для CD C и jdb c разъемов приемника / источника. Требуемый вариант использования:
- Если запись вставлена в таблицу студентов в исходной базе данных, строка должна быть вставлена в целевую базу данных Students_trail со старыми и новые значения для этой строки
- одинаковые в случае обновлений и удалений
- в исходной базе данных имеется несколько таблиц, то же самое выше применимо ко всем таблицам
На данный момент, у меня есть следующие конфигурации соединителя источника и приемника, которые реплицируют любые изменения, происходящие в исходной базе данных на dest db, как вставка вставила бы ту же строку, а обновление обновило бы эту строку, поскольку у новичка в kafka есть ограниченные знания, как мы можем достичь вышеупомянутого упомянутый сценарий для ведения журнала аудита. спасибо
@source.json:
{
"name": "jdbc-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "",
"database.password": "",
"database.server.name": "localhost",
"database.whitelist": "dbname",
"database.server.id": "1234",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.topicname",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
@sink.json:
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "",
"connection.url": "jdbc:mysql://localhost:3306",
"connection.user": "",
"connection.password": "",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"insert.mode": "upsert",
"auto.create": "true",
"delete.enabled": "true",
"auto.evolve": "true",
"pk.fields": "id",
"pk.mode": "record_key"
}
}
Обновление:
при условии, что это одна из исходных таблиц БД (студент):
id name birth_date
1 den 2001-09-12
2 jeff 2002-09-02
, если я вставлю здесь новую строку, например, 3 foo 1999-09-28
, тогда текущие настройки будут вставлять ту же строку в dest db в соответствующей автоматически созданной таблице, но вместо этого я хочу, чтобы оба, до и после значения состояния (представление josn) были вставлены в пользовательскую таблицу схемы (например, Students_trail для этого случая), в которой есть столбцы (id, student_id, before, after , отметка времени) может быть создана автоматически или автоматически, в зависимости от того, что работает. В этом случае вставка должна создать новую строку в таблице dest и должна выглядеть следующим образом:
Students_trail
id student_id before after timestamp
98 3 null "3, foo, 1999-09-28" current_timestamp
В случае вставки before - ноль, для удаления after - ноль, для обновления как до, так и после состояния, и для каждой операции в соответствующую таблицу dest вставляется строка.