Я создал поток NiFi, который в конечном итоге публикует записи json в виде записей с закодированными значениями Avro и строковыми ключами, используя схему в Confluent Registry для схемы значений.Вот конфигурация для AvroRecordSetWriter в NiFi.
Я сейчас пытаюсь использовать Kafka Connect (connect-standalone
) для перемещения сообщений в базу данных PostgreSQL с помощью JdbcSinkConnector, но получаю следующую ошибку: Ошибка при получении схемы Avro для идентификатора 1
Я подтвердил, что у меня есть схема в моем Confluent Registry с идентификатором 1. Ниже приведены мои конфиги для задачи Connect
Worker Config:
bootstrap.servers=localhost:29092
key.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.file.filename=/tmp/connect.offsets
rest.host.name=localhost
rest.port=8083
plugin.path=share/java
Конфигурация соединителя:
name=pg-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=rds
connection.url=jdbc:postgresql://localhost:5432/test
connection.user=postgres
connection.password=xxxxxxxx
insert.mode=upsert
table.name.format=test_data
auto.create=true
Я создал поток в NiFi, который правильно использует сообщения, и я также успешно использовал сообщения (которые форматируются как JSON ввывод) с помощью kafka-avro-console-consumer, указав --property schema.registry.url=http://schema-registry:8081
.Обратите внимание, что я запускаю потребителя в контейнере Docker, и поэтому URL-адрес не является локальным.
Я не уверен, что мне не хватает.Я думал только о том, что использую неправильный класс для преобразователя ключей, но это не имеет смысла с данной ошибкой.Кто-нибудь может увидеть, что я делаю не так?