У нас есть Oracle Source, оттуда нужно получить данные, с ошибкой в формате Avro и Json.
Файл коннектора
{
"name": "LITERAL_VALUES",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"connection.user": "<user>",
"connection.password": "<Password>",
"tasks.max": "1",
"connection.url": "jdbc:oracle:thin:@<server>:<Port>/<Schema>",
"mode": "bulk",
"topic.prefix": "LITERAL_VALUES",
"batch.max.rows":1000,
"numeric.mapping":"best_fit",
"query":"SELECT abc from xyz"
}
}
Ошибка при использовании в формате Avro
DataException: Cannot deserialize type int64 as type float64
Ошибка при использовании в формате JSON
WARN task [0_0] Skipping record due to deserialization error. topic=[LITERAL_VALUES_JSON] partition=[0] offset=[12823] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.common.errors.SerializationException: KsqlJsonDeserializer failed to deserialize data for topic: LITERAL_VALUES_JSON
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0xf01ae03 (above 0x0010ffff) at char #1, byte #7)
at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195)
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:243)
Попытался создать файл соединителя со свойством table.whitelist и использовать его с помощью ksql
Unable to verify the AVRO schema is compatible with KSQL. Subject not found. io.confluent.rest.exceptions.RestNotFoundException: Subject not found.
io.confluent.rest.exceptions.RestNotFoundException: Subject not found.
at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.subjectNotFoundException(Errors.java:50)
Проверенная схема отдыха
{
"subject": "RAW-LITERAL_VALUES-value",
"version": 1,
"id": 16,
"schema": "{\"type\":\"record\",\"name\":\"LITERAL_VALUES\",\"fields\":[{\"name\":\"LITERAL_ID\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":127,\"precision\":64,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"127\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null},{\"name\":\"LITERAL_NAME\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"LITERAL_VALUE\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"SOURCE_SYSTEM_ID\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"SOURCE_SYSTEM_INSTANCE_ID\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"EFF_STRT_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"EFF_END_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"STRT_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"END_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"CRTD_BY\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"CRTD_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"LST_UPD_BY\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"LST_UPD_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}],\"connect.name\":\"LITERAL_VALUES\"}"
}
Любая помощь высоко ценится.