Проблема с соединителем источника JDBC Oracle - PullRequest
0 голосов
/ 09 января 2019

У нас есть Oracle Source, оттуда нужно получить данные, с ошибкой в ​​формате Avro и Json.

Файл коннектора

{
  "name": "LITERAL_VALUES",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "key.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "value.serializer": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "connection.user": "<user>",
    "connection.password": "<Password>",
    "tasks.max": "1",
    "connection.url": "jdbc:oracle:thin:@<server>:<Port>/<Schema>",
    "mode": "bulk",
    "topic.prefix": "LITERAL_VALUES",
    "batch.max.rows":1000,
    "numeric.mapping":"best_fit",
    "query":"SELECT abc from xyz"
  }
}

Ошибка при использовании в формате Avro

DataException: Cannot deserialize type int64 as type float64

Ошибка при использовании в формате JSON

WARN task [0_0] Skipping record due to deserialization error. topic=[LITERAL_VALUES_JSON] partition=[0] offset=[12823] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.common.errors.SerializationException: KsqlJsonDeserializer failed to deserialize data for topic: LITERAL_VALUES_JSON
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0xf01ae03 (above 0x0010ffff) at char #1, byte #7)
        at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195)
        at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:243)

Попытался создать файл соединителя со свойством table.whitelist и использовать его с помощью ksql

Unable to verify the AVRO schema is compatible with KSQL. Subject not found. io.confluent.rest.exceptions.RestNotFoundException: Subject not found.
io.confluent.rest.exceptions.RestNotFoundException: Subject not found.
        at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.subjectNotFoundException(Errors.java:50)

Проверенная схема отдыха

{
  "subject": "RAW-LITERAL_VALUES-value",
  "version": 1,
  "id": 16,
  "schema": "{\"type\":\"record\",\"name\":\"LITERAL_VALUES\",\"fields\":[{\"name\":\"LITERAL_ID\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":127,\"precision\":64,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"127\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null},{\"name\":\"LITERAL_NAME\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"LITERAL_VALUE\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"SOURCE_SYSTEM_ID\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"SOURCE_SYSTEM_INSTANCE_ID\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"EFF_STRT_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"EFF_END_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"STRT_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"END_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"CRTD_BY\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"CRTD_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"LST_UPD_BY\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"LST_UPD_DT\",\"type\":[\"null\",{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}],\"connect.name\":\"LITERAL_VALUES\"}"
}

Любая помощь высоко ценится.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...