Как фильтровать таблицы в базах данных в kafka jdbc connect source - PullRequest
0 голосов
/ 19 февраля 2019

Я использую Kafka Connect в Confluent Community Platform для синхронизации баз данных MySQL.Источниками и приемниками являются базы данных MySQL.Это не сработало.

В моих ситуациях есть некоторые проблемы:

  1. Есть таблицы в других базах данных на том же сервере, и я не хочу читать их в Kafka, ноKafka Connect Source продолжает пытаться читать другие базы данных.

  2. Я хочу использовать org.apache.kafka.connect.json.JsonConverter как для разъема источника, так и для разъема раковины, но разъемы приемника не могут быть вставлены правильно.

  3. Я хочуЧтобы синхронизировать несколько баз данных, таблицы в разных базах данных могут быть с одинаковыми именами таблиц, как избежать конфликта имен таблиц, и коннекторы приемников правильно направляют темы Kafka для вставки данных в нужные базы данных? Иллюстрация синхронизации MySQL

Файл конфигурации Kafka JDBC Source Connector:

{
       "name": "br-auths-3910472223-source",
       "config": {
       "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

       "key.converter": "org.apache.kafka.connect.json.JsonConverter",
       "key.converter.schemas.enable":"true",
       "value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"true",

"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/br_auths?user=root&password=123456",
"database.whitelist":"br_auths",
"table.blacklist": "br_auths.__migrationversions,br_auths.auths_service_apps",

"mode": "timestamp",
"timestamp.column.name": "utime",
"validate.non.null": "false",

"incrementing.column.name": "id",
"topic.prefix": "br_auths__"
}
}

Файл конфигурации Kafka JDBC Sink Connector:

{
"name": "br-auths-3910472223-sink",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable":"true",

    "tasks.max": "1",
    "connection.url": "jdbc:mysql://rm-hp303a0n2vr8970.mysql.huhehaote.rds.aliyuncs.com:3306/dev-br-auths-391047222?user=br_auths&password=@123456",

    "topics": "br_auths__auths_roles,br_auths__auths_user_logins,br_auths__auths_user_roles,br_auths__auths_users,br_auths__auths_user_claims,br_auths__auths_user_tokens,br_auths__auths_role_claims", 

    "auto.create": "true",
    "insert.mode": "upsert",

    "transforms":"dropTopicPrefix",
    "transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropTopicPrefix.regex":"br_auths__(.*)",
    "transforms.dropTopicPrefix.replacement":"$1" 
}
}

Я хочу создать несколько пар соединителей источника и приемника для разных баз данных, некоторые таблицы белого списка в базе данных A на MySQL-сервере A можно постепенно синхронизировать с базой A на MySQL-сервере B.

Обновление 1:

Я изменил на connect-avro-distribution, Debezium Source Connector и JDBC Sink Connector.Разъем источника:

{
   "name":"br-auths-3910472223-source",
   "config":{
       "connector.class": "io.debezium.connector.mysql.MySqlConnector",
       "tasks.max": "1",
       "database.hostname": "localhost",
       "database.port": "3306",
       "database.user": "root",
       "database.password": "br123456",
       "database.useLegacyDatetimeCode": "false",
       "database.server.id": "184",
       "database.server.name": "local3910472223",
       "database.whitelist":"br_auths",
       "database.history.kafka.bootstrap.servers": "localhost:9092",
       "database.history.kafka.topic": "schema-changes.br-auths.local3910472223" ,
       "table.blacklist": "br_auths.__migrationversions,br_auths.auths_service_apps",
       "include.schema.changes": "true",
       "transforms": "route,TimestampConverter",
       "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",  
       "transforms.TimestampConverter.target.type": "string", 
       "transforms.TimestampConverter.field": "payload.after.ctime", 
       "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
       "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
       "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
       "transforms.route.replacement": "$2__$3"  
    }
} 

А разъем приемника:

{
"name": "br-auths-3910472223-sink",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://rm-hp303a0n2.mysql.huhehaote.rds.aliyuncs.com:3306/dev-br-auths-391047222?useLegacyDatetimeCode=false&user=br_auths&password=123456",
    "dialect.name": "MySqlDatabaseDialect",
    "topics.regex": "br_auths__(.*)",        
    "transforms": "dropTopicPrefix,unwrap",
    "transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropTopicPrefix.regex":"br_auths__(.*)",
    "transforms.dropTopicPrefix.replacement":"$1",        
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "insert.mode": "upsert",
    "pk.fields": "Id",
    "pk.mode": "record_value"
    }
}

Сообщение Avro преобразуется в json следующим образом:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "Id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "UserId"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "RoleId"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "APPID"
                    },
                    {
                        "type": "int32",
                        "optional": false,
                        "default": 0,
                        "field": "IsDeleted"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "name": "io.debezium.time.Timestamp",
                        "version": 1,
                        "default": 0,
                        "field": "ctime"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "name": "io.debezium.time.Timestamp",
                        "version": 1,
                        "default": 0,
                        "field": "utime"
                    }
                ],
                "optional": true,
                "name": "local3910472223.br_auths.auths_user_roles.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "Id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "UserId"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "RoleId"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "APPID"
                    },
                    {
                        "type": "int32",
                        "optional": false,
                        "default": 0,
                        "field": "IsDeleted"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "name": "io.debezium.time.Timestamp",
                        "version": 1,
                        "default": 0,
                        "field": "ctime"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "name": "io.debezium.time.Timestamp",
                        "version": 1,
                        "default": 0,
                        "field": "utime"
                    }
                ],
                "optional": true,
                "name": "local3910472223.br_auths.auths_user_roles.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": true,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "server_id"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_sec"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "gtid"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "file"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "pos"
                    },
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "row"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "default": false,
                        "field": "snapshot"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "thread"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "table"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "query"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.mysql.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            }
        ],
        "optional": false,
        "name": "local3910472223.br_auths.auths_user_roles.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "Id": "DB4DA841364860D112C3C76BDCB36635",
            "UserId": "0000000000",
            "RoleId": "5b7e5f9b4bc00d89c4cf96ae",
            "APPID": "br.region2",
            "IsDeleted": 0,
            "ctime": 1550138524000,
            "utime": 1550138524000
        },
        "source": {
            "version": "0.8.3.Final",
            "name": "local3910472223",
            "server_id": 0,
            "ts_sec": 0,
            "gtid": null,
            "file": "mysql-bin.000003",
            "pos": 64606,
            "row": 0,
            "snapshot": true,
            "thread": null,
            "db": "br_auths",
            "table": "auths_user_roles",
            "query": null
        },
        "op": "c",
        "ts_ms": 1550568556614
    }
}

Столбцы, использующиеТип даты и времени MySQL был сериализован в большое целое число, соединитель приемника JDBC попытался вставить в столбцы даты и времени MySQL, и это не удалось.

Итак, я написал transforms.TimestampConverter в конфигурации с исходным соединением, но столбцы ctime, utime не изменились.В чем дело?

1 Ответ

0 голосов
/ 19 февраля 2019
  1. Если вы хотите синхронизировать свои базы данных, коннектор JDBC Source не самый лучший - вы хотите использовать правильный CDC на основе журнала, который для MySQL вы можете получить с Debezium.Подробнее здесь .
  2. Если вы ничего не делаете с данными, вам вообще нужен Кафка?Будет ли выделенный инструмент репликации MySQL более подходящим?
  3. К вашим конкретным проблемам. В этой статье будут рассмотрены многие ваши вопросы.В частности:

    1. Есть таблицы в других базах данных на том же сервере, и я не хочу читать их в Kafka, но Kafka Connect Source продолжает пытаться читать другиебазы данных.

      Используйте комбинацию table.whitelist, table.blacklist и schema.pattern при необходимости.Если вы не можете сопоставить весь шаблон с одним разъемом, вам потребуется использовать несколько разъемов для достижения желаемого набора.

    2. Я хочу использовать org.apache.kafka.connect.json.JsonConverter как в исходном соединителе, так и в соединителе раковины, но соединители приемника не могут быть вставлены правильно.

      Без объяснения "не могу правильно вставить" сложно ответить на этот вопрос.В общем, я бы использовал Avro из-за более богатой поддержки схемы и более эффективных сообщений (нет встроенной схемы, схема хранится в реестре схем).См. здесь для получения более подробной информации.

    3. Я хочу синхронизировать несколько баз данных, таблицы в разных базах данных могут быть с одинаковыми именами таблиц, как избежать имен таблицконнекторы конфликта и приемника правильно маршрутизируют темы Kafka для вставки данных в нужные базы данных?

      Вы хотите использовать комбинацию topic.prefix на соединителе источника для пометки тем из определенного источника, изатем Single Message Transform RegexRouter (как вы уже нашли) для дальнейшей работы с именами тем, либо в соединителе источника и / или соединителе приемника.Вам может понадобиться несколько коннекторов приемника, использующих topics.regex для выбора определенных тем для маршрутизации в конкретную базу данных.

...