Я использую Confluent kafka-connect-jdb c для чтения данных из разных СУБД в kafka. Вот моя тестовая таблица:
CREATE TABLE DFOCUSVW.T4(
COL1 VARCHAR(100) NOT null,
COL2 DECIMAL(6, 3) NOT null,
COL3 NUMERIC(6, 3) NOT null,
COL4 DECIMAL(12, 9) NOT null,
COL5 NUMERIC(12, 9) NOT null,
COL6 DECIMAL(18, 15) NOT null,
COL7 NUMERIC(18, 15) NOT null,
COL8 INTEGER NOT null,
Td_Update_Ts timestamp NOT null,
PRIMARY KEY (col1)
);
На мой взгляд, numeri c .mapping = best_fit можно преобразовать для COL2 ... COL5 в FLOAT64 (точность 15 знаков), но COL6 ... COL7 должны быть сериализованы как байты без какого-либо преобразования, потому что они не помещаются в FLOAT64.
Вот автоматически сгенерированная схема AVRO, которая одинакова для numeri c .mapping = best_fit и numeri c .mapping = none:
{
"connect.name": "T4",
"fields": [
{
"name": "COL1",
"type": "string"
},
{
"name": "COL2",
"type": {
"connect.name": "org.apache.kafka.connect.data.Decimal",
"connect.parameters": {
"scale": "3"
},
"connect.version": 1,
"logicalType": "decimal",
"precision": 64,
"scale": 3,
"type": "bytes"
}
},
{
"name": "COL3",
"type": {
"connect.name": "org.apache.kafka.connect.data.Decimal",
"connect.parameters": {
"scale": "3"
},
"connect.version": 1,
"logicalType": "decimal",
"precision": 64,
"scale": 3,
"type": "bytes"
}
},
{
"name": "COL4",
"type": {
"connect.name": "org.apache.kafka.connect.data.Decimal",
"connect.parameters": {
"scale": "9"
},
"connect.version": 1,
"logicalType": "decimal",
"precision": 64,
"scale": 9,
"type": "bytes"
}
},
{
"name": "COL5",
"type": {
"connect.name": "org.apache.kafka.connect.data.Decimal",
"connect.parameters": {
"scale": "9"
},
"connect.version": 1,
"logicalType": "decimal",
"precision": 64,
"scale": 9,
"type": "bytes"
}
},
{
"name": "COL6",
"type": {
"connect.name": "org.apache.kafka.connect.data.Decimal",
"connect.parameters": {
"scale": "15"
},
"connect.version": 1,
"logicalType": "decimal",
"precision": 64,
"scale": 15,
"type": "bytes"
}
},
{
"name": "COL7",
"type": {
"connect.name": "org.apache.kafka.connect.data.Decimal",
"connect.parameters": {
"scale": "15"
},
"connect.version": 1,
"logicalType": "decimal",
"precision": 64,
"scale": 15,
"type": "bytes"
}
},
{
"name": "COL8",
"type": "int"
},
{
"name": "Td_Update_Ts",
"type": {
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"connect.version": 1,
"logicalType": "timestamp-millis",
"type": "long"
}
}
],
"name": "T4",
"type": "record"
}
Эта схема показывает, что даже в случае best_fit платформа connect не преобразовала логический тип «DECIMAL» в примитивный тип AVRO «double» для COL2 .. .COL5 перед передачей строк в сериализатор AVRO.
Эта схема также всегда сообщает точность как 64, что не соответствует спецификации AVRO c:
Из Avro spe c:
- масштаб, целое число JSON, представляющее масштаб (необязательно). Если не указано иное, масштаб равен 0.
- precision, целое число JSON, представляющее (максимальную) точность десятичных знаков, хранящихся в этом типе (обязательно).
Итак, « precision »для этих типов должно быть 6,12, а 18, а не 64!
При этом, десериализатор avro должен иметь достаточно информации для точной десериализации, но при чтении topi c с avro Потребитель консоли, я получаю:
{"COL1":"x2","COL2":"\u0003g“","COL3":"\u0003g“","COL4":"3ó1Ã\u0015","COL5":"3ó1Ã\u0015","COL6":"\u0003\u0018±š\u000E÷_y","COL7":"\u0003\u0018±š\u000E÷_y","COL8":2,"Td_Update_Ts":1583366400000}
{"COL1":"x3","COL2":"\u0004î3","COL3":"\u0004î3","COL4":"K;¨«\u0015","COL5":"K;¨«\u0015","COL6":"\u0004{÷\u0012l_y","COL7":"\u0004{÷\u0012l_y","COL8":3,"Td_Update_Ts":1583366400000}
{"COL1":"x1","COL2":"\u0001àó","COL3":"\u0001àó","COL4":"\u001CªºÛ\u0015","COL5":"\u001CªºÛ\u0015","COL6":"\u0001µl!±m_y","COL7":"\u0001µl!±m_y","COL8":1,"Td_Update_Ts":1583366400000}
Для этих данных:
INSERT INTO t4 VALUES('x1', 123.123, 123.123, 123.123456789, 123.123456789, 123.123456789012345, 123.123456789012345, 1, '2020-03-05 00:00:00.000000 +00:00');
INSERT INTO t4 VALUES('x2', 223.123, 223.123, 223.123456789, 223.123456789, 223.123456789012345, 223.123456789012345, 2, '2020-03-05 00:00:00.000000 +00:00');
INSERT INTO t4 VALUES('x3', 323.123, 323.123, 323.123456789, 323.123456789, 323.123456789012345, 323.123456789012345, 3, '2020-03-05 00:00:00.000000 +00:00');
Я пробовал kafka-avro-console-consumer как с --property value.schema, передающим указанное выше схему вручную и --property schema.registry.url = http://localhost: 8081
Итак, десериализатору явно не удалось использовать схему AVRO для правильного десерирования. Интересно, почему?