Debezium из MySQL в Postgres с JDBC Sink - изменение transforms.route.replacement дает ошибку SinkRecordField - PullRequest
0 голосов
/ 29 января 2019

Я использую это Дебезиум-примеры

source.json

{
"name": "inventory-connector",
"config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"
}
}

jdbc-sink.json

{
"name": "jdbc-sink",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "customers",
    "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.fields": "id",
    "pk.mode": "record_value"
}
}

работает нормально. Но когда я внес некоторые изменения, как описано в следующем сценарии.это дает мне ошибку 'SinkRecordField'.

Сценарий

Я изменил эти свойства из источника

    "transforms.route.replacement": "my-$2"

сейчасэто создает тему в кафке следующим образом:

my-inventory

Когда я указал тему = my-inventory в jdbc-sink, это дает мне следующее исключение [io.confluent.connect.jdbc.sink.DbStructure]

connect_1    | 2019-01-29 10:34:32,218 INFO   ||  Unable to find fields [SinkRecordField{schema=Schema{STRING}, name='email', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='first_name', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='last_name', isPrimaryKey=false}] among column names [street, customer_id, city, state, id, type, zip]   [io.confluent.connect.jdbc.sink.DbStructure]
connect_1    | 2019-01-29 10:34:32,220 ERROR  ||  WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted.   [org.apache.kafka.connect.runtime.WorkerSinkTask]
connect_1    | org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add missing field SinkRecordField{schema=Schema{STRING}, name='email', isPrimaryKey=false}, as it is not optional and does not have a default value
connect_1    |  at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:133)

Примечание: В Db создается таблица с именем 'my-inventory'

1 Ответ

0 голосов
/ 09 апреля 2019

Приемник JDBC ожидает одну таблицу для каждой темы, а также одну схему (имена столбцов x типов) для каждой темы.

Ваша маршрутизация регулярного выражения на стороне Debezium / source фактически выводит any таблица (может включать в себя некоторые системные, хотя я не помню, чтобы это было значение по умолчанию в конфигурации) в базе данных inventory к теме "мой инвентарь".

Поэтому, как только выЕсли бы в этой теме было занято более одной таблицы, вы могли бы столкнуться с проблемами ...

...