Разъем debezium sqlserver выводит закодированные значения для числовых / десятичных полей - PullRequest
0 голосов
/ 06 апреля 2020

У меня есть эта таблица в SQL Сервер:

CREATE TABLE [dbo].[blast_info](
    [blast_id] [int] NOT NULL,
    [tnt_amount_kg] [decimal](18, 2) NOT NULL,
    [time_blasted] [datetime] NOT NULL,
    [hole_deep_ft] [numeric](9, 2) NULL,
    [hole_coord_n] [numeric](18, 6) NOT NULL,
    [hole_coord_e] [numeric](18, 6) NULL
) ON [PRIMARY]

Дебезиум настроен для работы в качестве плагина из слияния. Данные отправляются в Kafka, но когда я читаю их либо через python, либо через консоль-потребителя, я вижу закодированные значения для чисел c и десятичных типов:

{"blast_id":17,"tnt_amount_kg":"eA==","time_blasted":1585803600000,"hole_deep_ft":"AOY=","hole_coord_n":"A/OOVvYA","hole_coord_e":"AKSQkBwA","__ts_ms":1586140437125}
{"blast_id":16,"tnt_amount_kg":"ANw=","time_blasted":1583125200000,"hole_deep_ft":"Aa4=","hole_coord_n":"A/OOVvYA","hole_coord_e":"AKSQkBwA","__ts_ms":1586140437125}
{"blast_id":17,"tnt_amount_kg":"eA==","time_blasted":1585803600000,"hole_deep_ft":"AOY=","hole_coord_n":"A/OOVvYA","hole_coord_e":"AKSQkBwA","__ts_ms":1586140437126}
Processed a total of 38 messages

Почему это и что будет исправлением ? Спасибо.

1 Ответ

0 голосов
/ 08 апреля 2020

Возможный Quickfix (совсем не рекомендуется):

Просто используйте real тип данных в SQL Сервер вместо numeric или decimal, так как Debezium будет хранить real as float.

Долгосрочное исправление:

Как упомянуто в Debezium SQL Документация по соединителю сервера , в нем хранится decimal и numeric значения в виде binary, который представлен классом org.apache.kafka.connect.data.Decimal.

Вы можете извлечь эту информацию из самого сообщения, но для этого вам необходимо включить схему в сообщениях. Вы можете сделать это, установив key.converter.schemas.enable=true (для ключа сообщения) и value.converter.schemas.enable=true (для значения сообщения).

После изменения указанных выше свойств ваше сообщение будет иметь информацию о схеме. См. Этот пример:

Схема таблицы:

CREATE TABLE [dbo].[kafka_datatype](
    [id] [int] IDENTITY(1,1)  PRIMARY KEY,
    [col_value] [varchar](10) NULL,
    [create_date] [datetime] NULL,
    [col_decimal] [decimal](33, 18) NULL,
    [col_double] [real] NULL,
    [comments] [varchar](5000) NULL
) 

Kafka Сообщение:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "col_value"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "create_date"
      },
      {
        "type": "bytes",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Decimal",
        "version": 1,
        "parameters": {
          "scale": "18",
          "connect.decimal.precision": "33"
        },
        "field": "col_decimal"
      },
      {
        "type": "float",
        "optional": true,
        "field": "col_double"
      },
      {
        "type": "string",
        "optional": true,
        "field": "comments"
      }
    ],
    "optional": true,
    "name": "TEST.dbo.kafka_datatype.Value"
  },
  "payload": {
    "id": 15,
    "col_value": "test",
    "create_date": 1586335960297,
    "col_decimal": "AKg/JYrONaAA",
    "col_double": 12.12345,
    "comments": null
  }
}

Пожалуйста, прочитайте Debezium SQL Документация по соединителю сервера знать, как Дебезиум обрабатывает типы данных.

Теперь перейдем к потребительской части, используйте разъемы раковины (например, JDB C Sink Connector ) в соответствии с вашими потребностями. Если вы хотите использовать python или консольный потребитель, вам нужно написать собственный десериализатор.

PS:

Одна проблема, которая может возникнуть со временем, размер topi c будет увеличиваться по мере сохранения схемы каждого сообщения. Чтобы избежать сохранения схемы в сообщении, вы можете использовать Avro Converter и Registry Schema , предоставленные Confluent.

Надеюсь, это поможет!

...