Вставьте сообщение Кафки в один столбец - PullRequest
0 голосов
/ 10 октября 2019

Я новичок в Кафке и изучаю его. Я использую сливную платформу для работы с Кафкой. Я создал тему в Kafka, используя Avro-console-produser с ключом и значением.

Образец ключа - значение записи "record" - { "id": 999, "product": "foo", "quantity": 100, "price": 50.0}

Я пытаюсь нажатьвсе это сообщение в один столбец в таблице назначения (оракул) с типом данных CLOB.

Соединитель JDBC Sink используется с автономным режимом.

Ниже приведены мои свойства работника и разъема:

КОНФИГ. РАБОТНИКА:

bootstrap.servers=64.102.162.197:9097,64.102.162.224:9097,64.102.163.41:9097
schemas.enable=false
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://64.102.162.197:8085,http://64.102.162.224:8085,http://64.102.163.41:8085
key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://64.102.162.197:8085,http://64.102.162.224:8085,http://64.102.163.41:8085
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/home/Data/offsetStorage/connect.offsets
rest.port=8094
plugin.path=/home/confluent/confluent-5.2.1/share/java

CONNECTOR CONFIG:

name=test_clob
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
connection.url=jdbc:oracle:thin:@JDBC-URL
connection.user=ABC
connection.password=dsfkdnf
table.name.format=BH_TEST4
auto.create=false
auto.evolve=false
transforms=MakeMap
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=JSON

У меня была идея заключить сообщение в объект, чтобы оно было сопоставлено со столбцом в таблице оракулов, и я использовал поле подъема изSMT, чтобы сделать это. Но этот SMT создает сообщение как структуру, и я получаю сообщение об ошибке «Unsupported source data type: STRUCT».

Ниже приведена ошибка, которую я получил:

[2019-10-10 05:55:36,526] ERROR WorkerSinkTask{id=test_clob-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:558)
org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: STRUCT
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.bindField(GenericDatabaseDialect.java:1436)
        at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindField(PreparedStatementBinder.java:141)
        at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindNonKeyFields(PreparedStatementBinder.java:135)
        at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:71)
        at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:139)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-10-10 05:55:36,527] ERROR WorkerSinkTask{id=test_clob-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Unsupported source data type: STRUCT
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.bindField(GenericDatabaseDialect.java:1436)
        at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindField(PreparedStatementBinder.java:141)
        at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindNonKeyFields(PreparedStatementBinder.java:135)
        at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:71)
        at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:139)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
        ... 10 more

Пожалуйста, помогите мне впонимание проблемы и способы ее решения. Заранее спасибо!

...