Возможный Quickfix (совсем не рекомендуется):
Просто используйте real
тип данных в SQL Сервер вместо numeric
или decimal
, так как Debezium будет хранить real
as float
.
Долгосрочное исправление:
Как упомянуто в Debezium SQL Документация по соединителю сервера , в нем хранится decimal
и numeric
значения в виде binary
, который представлен классом org.apache.kafka.connect.data.Decimal
.
Вы можете извлечь эту информацию из самого сообщения, но для этого вам необходимо включить схему в сообщениях. Вы можете сделать это, установив key.converter.schemas.enable=true
(для ключа сообщения) и value.converter.schemas.enable=true
(для значения сообщения).
После изменения указанных выше свойств ваше сообщение будет иметь информацию о схеме. См. Этот пример:
Схема таблицы:
CREATE TABLE [dbo].[kafka_datatype](
[id] [int] IDENTITY(1,1) PRIMARY KEY,
[col_value] [varchar](10) NULL,
[create_date] [datetime] NULL,
[col_decimal] [decimal](33, 18) NULL,
[col_double] [real] NULL,
[comments] [varchar](5000) NULL
)
Kafka Сообщение:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "col_value"
},
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "create_date"
},
{
"type": "bytes",
"optional": true,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "18",
"connect.decimal.precision": "33"
},
"field": "col_decimal"
},
{
"type": "float",
"optional": true,
"field": "col_double"
},
{
"type": "string",
"optional": true,
"field": "comments"
}
],
"optional": true,
"name": "TEST.dbo.kafka_datatype.Value"
},
"payload": {
"id": 15,
"col_value": "test",
"create_date": 1586335960297,
"col_decimal": "AKg/JYrONaAA",
"col_double": 12.12345,
"comments": null
}
}
Пожалуйста, прочитайте Debezium SQL Документация по соединителю сервера знать, как Дебезиум обрабатывает типы данных.
Теперь перейдем к потребительской части, используйте разъемы раковины (например, JDB C Sink Connector ) в соответствии с вашими потребностями. Если вы хотите использовать python или консольный потребитель, вам нужно написать собственный десериализатор.
PS:
Одна проблема, которая может возникнуть со временем, размер topi c будет увеличиваться по мере сохранения схемы каждого сообщения. Чтобы избежать сохранения схемы в сообщении, вы можете использовать Avro Converter и Registry Schema , предоставленные Confluent.
Надеюсь, это поможет!