Я пытался производить данные с использованием Javaclient и сохранять данные в postgres с использованием коннектора раковины. я могу публиковать данные в topi c в формате protobuf, но коннектор раковины, кажется, не работает. Просто убедитесь, что данные публикуются и доступны в topi c, я построил потребителя в Java -клиенте, и я могу видеть данные.
Так что данные есть в topi c, но что-то не так с моим конфигом коннектора раковины. Пожалуйста, найдите подробности конфигурации ниже, любая помощь очень ценится!
connect-standalone.properties:
bootstrap .servers = localhost: 9092
key.converter = com.blueapron.connect.protobuf.ProtobufConverter
value.converter = com.blueapron.connect.protobuf.ProtobufConverter
key.converter.schemas.enable = false
value.converter.schemas.enable = true
offset.storage.file.filename = / tmp / connect.offsets
offset.flu sh .interval. ms = 10000
connect- postgres -protobuf.properties:
имя = раковина- postgres connector.class = io.confluent.connect.jdb c .JdbcSinkConnector
tasks.max = 2 темы = docker -proto-topi c
connection.url = jdb c : postgresql: // localhost: 5432 / kafka-test
connection.user = postgres connection.password = *** insert.mode = insert
value.converter = com .blueapron.connect.protobuf.ProtobufConverter
value.co nverter.protoClassName = com.test.extraction.model.protobuf.AlbumOuterClass $ Album
key.converter = com.blueapron.connect.protobuf.ProtobufConverter
auto.create = true auto.evolve = false
offset.storage.file.filename = / tmp / post-sink.offsets
Данные альбома:
artist: "Tears For Fears"
song_title: "Songs from the Big Chair"
Журнал ошибок:
[task-thread-sink-postgres-1] INFO io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect - Checking PostgreSql dialect for existence of table "docker-proto-topic-2"
[task-thread-sink-postgres-1] INFO io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect - Using PostgreSql dialect table "docker-proto-topic-2" absent
[task-thread-sink-postgres-1] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - WorkerSinkTask{id=sink-postgres-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted.
org.apache.kafka.connect.errors.ConnectException: null (ARRAY) type doesn't have a mapping to the SQL database column type
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getSqlType(GenericDatabaseDialect.java:1727)