Ошибка вставки столбцов даты и времени MySQL с источником Kafka Connect Debezium и приемником JDBC - PullRequest
0 голосов
/ 20 февраля 2019

Я использую Kafka Connect в Confluent Community Platform для синхронизации баз данных MySQL.Источниками и приемниками являются базы данных MySQL.Это не сработало.

Цель MySQL Sync Моя исходная конфигурация коннектора:

{
   "name":"br-auths-3910472223-source",
   "config":{
   "connector.class": "io.debezium.connector.mysql.MySqlConnector",
   "tasks.max": "1",
   "database.hostname": "localhost",
   "database.port": "3306",
   "database.user": "root",
   "database.password": "br123456",
   "database.useLegacyDatetimeCode": "false",
   "database.server.id": "184",
   "database.server.name": "local3910472223",
   "database.whitelist":"br_auths",
   "database.history.kafka.bootstrap.servers": "localhost:9092",
   "database.history.kafka.topic": "schema-changes.br-auths.local3910472223" ,
   "table.blacklist": "br_auths.__migrationversions,br_auths.auths_service_apps",
   "include.schema.changes": "true",
   "transforms": "route,TimestampConverter",
   "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",  
   "transforms.TimestampConverter.target.type": "string", 
   "transforms.TimestampConverter.field": "payload.after.ctime", 
   "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
   "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
   "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
   "transforms.route.replacement": "$2__$3"  
  }
} 

Но сообщение в json по-прежнему нравится:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "Id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "UserId"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "RoleId"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "APPID"
                    },
                    {
                        "type": "int32",
                        "optional": false,
                        "default": 0,
                        "field": "IsDeleted"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "name": "io.debezium.time.Timestamp",
                        "version": 1,
                        "default": 0,
                        "field": "ctime"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "name": "io.debezium.time.Timestamp",
                        "version": 1,
                        "default": 0,
                        "field": "utime"
                    }
                ],
                "optional": true,
                "name": "local3910472223.br_auths.auths_user_roles.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "Id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "UserId"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "RoleId"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "APPID"
                    },
                    {
                        "type": "int32",
                        "optional": false,
                        "default": 0,
                        "field": "IsDeleted"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "name": "io.debezium.time.Timestamp",
                        "version": 1,
                        "default": 0,
                        "field": "ctime"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "name": "io.debezium.time.Timestamp",
                        "version": 1,
                        "default": 0,
                        "field": "utime"
                    }
                ],
                "optional": true,
                "name": "local3910472223.br_auths.auths_user_roles.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": true,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "server_id"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_sec"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "gtid"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "file"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "pos"
                    },
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "row"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "default": false,
                        "field": "snapshot"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "thread"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "table"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "query"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.mysql.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            }
        ],
        "optional": false,
        "name": "local3910472223.br_auths.auths_user_roles.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "Id": "DB4DA841364860D112C3C76BDCB36635",
            "UserId": "0000000000",
            "RoleId": "5b7e5f9b4bc00d89c4cf96ae",
            "APPID": "br.region2",
            "IsDeleted": 0,
            "ctime": 1550138524000,
            "utime": 1550138524000
        },
        "source": {
            "version": "0.8.3.Final",
            "name": "local3910472223",
            "server_id": 0,
            "ts_sec": 0,
            "gtid": null,
            "file": "mysql-bin.000003",
            "pos": 64606,
            "row": 0,
            "snapshot": true,
            "thread": null,
            "db": "br_auths",
            "table": "auths_user_roles",
            "query": null
        },
        "op": "c",
        "ts_ms": 1550568556614
    }
}

Столбцы ctime, utime были сериализованы в большое целое число, когда JDBC Sink Connector пытался вставить столбцы MySQL datetime, он генерировал исключения формата, ему требовался строковый формат «ГГГГ-ММ-ДД ЧЧ: мм: сс».Мое преобразование TimestampConverter не работает.

Есть ли какое-либо решение для правильного ввода и вывода столбцов даты и времени?

...