Kafka Connect, JdbcSinkConnector - Получение «Ошибка при получении схемы Avro для идентификатора 1, тема не найдена; код ошибки: 40401» - PullRequest
0 голосов
/ 11 декабря 2018

Я создал поток NiFi, который в конечном итоге публикует записи json в виде записей с закодированными значениями Avro и строковыми ключами, используя схему в Confluent Registry для схемы значений.Вот конфигурация для AvroRecordSetWriter в NiFi.

Я сейчас пытаюсь использовать Kafka Connect (connect-standalone) для перемещения сообщений в базу данных PostgreSQL с помощью JdbcSinkConnector, но получаю следующую ошибку: Ошибка при получении схемы Avro для идентификатора 1

Я подтвердил, что у меня есть схема в моем Confluent Registry с идентификатором 1. Ниже приведены мои конфиги для задачи Connect

Worker Config:

bootstrap.servers=localhost:29092
key.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.file.filename=/tmp/connect.offsets
rest.host.name=localhost
rest.port=8083
plugin.path=share/java

Конфигурация соединителя:

name=pg-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=rds
connection.url=jdbc:postgresql://localhost:5432/test
connection.user=postgres
connection.password=xxxxxxxx
insert.mode=upsert
table.name.format=test_data
auto.create=true

Я создал поток в NiFi, который правильно использует сообщения, и я также успешно использовал сообщения (которые форматируются как JSON ввывод) с помощью kafka-avro-console-consumer, указав --property schema.registry.url=http://schema-registry:8081.Обратите внимание, что я запускаю потребителя в контейнере Docker, и поэтому URL-адрес не является локальным.

Я не уверен, что мне не хватает.Я думал только о том, что использую неправильный класс для преобразователя ключей, но это не имеет смысла с данной ошибкой.Кто-нибудь может увидеть, что я делаю не так?

1 Ответ

0 голосов
/ 12 декабря 2018

Я не знаю много о Nifi, но вижу, что имя схемы - "rds", и в журналах ошибок говорится, что он не нашел тему в реестре схемы.

Kafka использует KafkaAvroSerializer для сериализации записей avro и одновременно регистрации связанной схемы avro в реестре схем.Он использует KafkaAvroDeserializer для десериализации записей avro и извлечения связанной схемы из реестра схем.

Схема хранилища реестра схем по категориям, называемым "субъектами", и поведение по умолчанию для именования субъекта для записи: topic_name-value для записи значения и topic_name-key для ключа.

В вашем случае вы зарегистрировали схему не в Kafka, а в Nifi, поэтому я предполагаю, что имя "rds" появляется в илиэто имя субъекта в реестре схемы.

Как вы убедились, что ваша схема хранится в основном?

Обычно в вашем случае правильным субъектом будет rds-value, потому что вы используете реестр схемы только для записей значений.

...