Можно ли использовать файл-источник в качестве ввода И JDBC-сток в качестве вывода с Kafka? - PullRequest
1 голос
/ 03 июня 2019

В настоящее время я работаю над проектом Kafka, и моя проблема в том, что я могу прочитать файл с помощью коннектора исходных файлов и сохранить данные в теме.

Моя конфигурация:

connector.class=FileStreamSource
tasks.max=1
file=/vagrant/fake_sensor.dat
topic=sensor

Затем я пытаюсь отправить данные в мою базу данных Postgres с помощью коннектора Jdbc-sink.

Моя конфигурация:

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=sensor
connection.url=jdbc:postgresql://localhost:5432/pg_data_eng
connection.user=vagrant
connection.password=vagrant
auto.create=true
key.converter=org.apache.kafka.connect.json.JsonConverter
schemas.enable=false

Обратите внимание, что я пробовал несколько разных конфигураций, но ничего не получалось. Я вижу свою ошибку, используя REST API:

http://localhost:18083/connectors/jdbc-sink/tasks/0/status

И я получаю это:

{"id":0,"state":"FAILED","worker_id":"127.0.1.1:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)\
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:82)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:63)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:78)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more\n"}

Я вижу, что Value schema must be of type Struct - это основная проблема, и, возможно, она связана с реестром схемы.

Я тоже попробовал это, добавив value.converter.schema.registry.url=http://localhost:8081 Но все еще не работает.

Я исследовал некоторые учебники в Интернете, но ни один из них не касался как file-source, так и jdbc-sink, поэтому мой вопрос: Возможно ли это сделать?

1 Ответ

1 голос
/ 03 июня 2019

Проблема заключается в том, что FileSourceConnect возвращает запись Connect со схемой String, а не Struct (именно этого обычно ожидают JDBC Sink и другие соединения).

Вы должны использовать преобразование для преобразования значения в структуру.

connector.class=FileStreamSource
tasks.max=1
file=/vagrant/fake_sensor.dat
topic=sensor

# Add this
transforms=HoistField
transforms.HoistField.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.HoistField.field=line

Затем с входным файлом

Foo 
Bar

После преобразования сообщения станут такими (пожалуйста, используйте тему для подтверждения вручную)

{"line":"Foo"}
{"line":"Bar"}

Итак, вашей базе данных потребуется один line текстовый столбец.

Я также попробовал это, добавив value.converter.schema.registry.url = http://localhost:8081 Но все равно не работает.

Вам нужно будет использоватьAvro для этого, а не JSONConverter

...