Как получить имя таблицы и имя базы данных в событии CDC, полученном от debezium kafka connect - PullRequest
1 голос
/ 02 мая 2019

Настройка: У меня включен CDC на MS SQL Server, и события CDC передаются в Kafka с помощью debezium kafka connect (source). Кроме того, несколько событий CDC таблицы направляются в одну тему в Kafka.

Вопрос: Поскольку у меня есть несколько табличных данных в теме kafka, я хотел бы иметь имя таблицы и имя базы данных в данных CDC.

Я получаю имя таблицы и имя базы данных в MySQL CDC, но не в MS SQL CDC.

Ниже приведен исходный соединитель Debezium для SQL Server

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
  "name": "cdc-user_profile-connector",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "tasks.max": "1",
    "database.hostname": "<<hostname>>",
    "database.port": "<<port>>",
    "database.user": "<<username>>",
    "database.password": "<<password>>",
    "database.server.name": "test",
    "database.dbname": "testDb",
    "table.whitelist": "schema01.table1,schema01.table2",
    "database.history.kafka.bootstrap.servers": "broker:9092",
    "database.history.kafka.topic": "digital.user_profile.schema.audit",
    "database.history.store.only.monitored.tables.ddl": true,
    "include.schema.changes": false,
    "event.deserialization.failure.handling.mode": "fail",
    "snapshot.mode": "initial_schema_only",
    "snapshot.locking.mode": "none",
    "transforms":"addStaticField,topicRoute",
    "transforms.addStaticField.type":"org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.addStaticField.static.field":"source_system",
    "transforms.addStaticField.static.value":"source_system_1",
    "transforms.topicRoute.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.topicRoute.regex":"(.*)",
    "transforms.topicRoute.replacement":"digital.user_profile",
    "errors.tolerance": "none",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "errors.retry.delay.max.ms": 60000,
    "errors.retry.timeout": 300000
  }
}'

Я получаю вывод ниже (демонстрационные данные)

{
  "before": {
    "profile_id": 147,
    "email_address": "test@gmail.com"
  },
  "after": {
    "profile_id": 147,
    "email_address": "test_modified@gmail.com"
  },
  "source": {
    "version": "0.9.4.Final",
    "connector": "sqlserver",
    "name": "test",
    "ts_ms": 1556723528917,
    "change_lsn": "0007cbe5:0000b98c:0002",
    "commit_lsn": "0007cbe5:0000b98c:0003",
    "snapshot": false
  },
  "op": "u",
  "ts_ms": 1556748731417,
  "source_system": "source_system_1"
}

Мое требование - получить как показано ниже

{
  "before": {
    "profile_id": 147,
    "email_address": "test@gmail.com"
  },
  "after": {
    "profile_id": 147,
    "email_address": "test_modified@gmail.com"
  },
  "source": {
    "version": "0.9.4.Final",
    "connector": "sqlserver",
    "name": "test",
    "ts_ms": 1556723528917,
    "change_lsn": "0007cbe5:0000b98c:0002",
    "commit_lsn": "0007cbe5:0000b98c:0003",
    "snapshot": false,
    "db": "testDb",
    "table": "table1/table2"
  },
  "op": "u",
  "ts_ms": 1556748731417,
  "source_system": "source_system_1"
}

Ответы [ 2 ]

0 голосов
/ 02 мая 2019

Это запланировано как часть https://issues.jboss.org/browse/DBZ-875 вопроса

0 голосов
/ 02 мая 2019

Debezium Kafka-Connect обычно помещает данные из каждой таблицы в отдельную тему, а имя темы имеет формат hostname.database.table. Обычно мы используем имя темы, чтобы различать исходную таблицу и имя базы данных.

Если вы помещаете данные из всех таблиц вручную в одну тему, возможно, вам придется добавить таблицу и имя базы данных вручную.

...