Журнал аудита базы данных с использованием сбора данных изменений kafka - PullRequest
1 голос
/ 13 апреля 2020

Я пытаюсь создать контрольный журнал для базы данных mysql, используя kafka, система должна обнаружить любые изменения в исходной базе данных и в ответ вставляет запись в целевую базу данных в соответствующую таблицу, создавая контрольный журнал для этого конкретного c таблица. В настоящее время я использую Debezium для CD C и jdb c разъемов приемника / источника. Требуемый вариант использования:

  • Если запись вставлена ​​в таблицу студентов в исходной базе данных, строка должна быть вставлена ​​в целевую базу данных Students_trail со старыми и новые значения для этой строки
  • одинаковые в случае обновлений и удалений
  • в исходной базе данных имеется несколько таблиц, то же самое выше применимо ко всем таблицам

На данный момент, у меня есть следующие конфигурации соединителя источника и приемника, которые реплицируют любые изменения, происходящие в исходной базе данных на dest db, как вставка вставила бы ту же строку, а обновление обновило бы эту строку, поскольку у новичка в kafka есть ограниченные знания, как мы можем достичь вышеупомянутого упомянутый сценарий для ведения журнала аудита. спасибо

@source.json:

{
    "name": "jdbc-source",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "localhost",
        "database.port": "3306",
        "database.user": "",
        "database.password": "",
        "database.server.name": "localhost",
        "database.whitelist": "dbname",
        "database.server.id": "1234",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.topicname",
        "transforms": "route",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$3"
    }
}

@sink.json:

{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "",
        "connection.url": "jdbc:mysql://localhost:3306",
        "connection.user": "",
        "connection.password": "",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "insert.mode": "upsert",
        "auto.create": "true",
        "delete.enabled": "true",
        "auto.evolve": "true",
        "pk.fields": "id",
        "pk.mode": "record_key"
    }
}

Обновление:

при условии, что это одна из исходных таблиц БД (студент):

id  name   birth_date
1   den    2001-09-12
2   jeff   2002-09-02

, если я вставлю здесь новую строку, например, 3 foo 1999-09-28

, тогда текущие настройки будут вставлять ту же строку в dest db в соответствующей автоматически созданной таблице, но вместо этого я хочу, чтобы оба, до и после значения состояния (представление josn) были вставлены в пользовательскую таблицу схемы (например, Students_trail для этого случая), в которой есть столбцы (id, student_id, before, after , отметка времени) может быть создана автоматически или автоматически, в зависимости от того, что работает. В этом случае вставка должна создать новую строку в таблице dest и должна выглядеть следующим образом:

Students_trail

id   student_id   before   after                 timestamp

98   3            null     "3, foo, 1999-09-28"  current_timestamp

В случае вставки before - ноль, для удаления after - ноль, для обновления как до, так и после состояния, и для каждой операции в соответствующую таблицу dest вставляется строка.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...