Соединитель Kafka и реестр схем - Ошибка при получении схемы Avro - Тема не найдена - PullRequest
1 голос
/ 23 апреля 2019

У меня есть тема, в которой будет много разных схем. На данный момент это только один. Я создал работу подключения через REST, как это:

{
 "name":"com.mycompany.sinks.GcsSinkConnector-auth2",
 "config": {
    "connector.class": "com.mycompany.sinks.GcsSinkConnector",
    "topics": "auth.events",
    "flush.size": 3,
    "my.setting":"bar",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "key.deserializer":"org.apache.kafka.common.serialization.StringDerserializer",
    "value.converter":"io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry-service:8081",
    "value.subject.name.strategy":"io.confluent.kafka.serializers.subject.RecordNameStrategy",
    "group.id":"account-archiver"

 }
}

Затем я отправляю сообщение в эту тему с помощью строкового ключа и сериализованной полезной нагрузки avro. Если я осматриваю тему в центре управления, я вижу, как правильно поступают десериализованные данные. Глядя на выход из экземпляра подключения, хотя я вижу это в журналах

RROR WorkerSinkTask{id=com.mycompany.sinks.GcsSinkConnector-auth2-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic auth.events to Avro:
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 7
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:319)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:307)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:158)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:271)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:184)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:145)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:90)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Отсюда видно, что есть две взаимосвязанные проблемы:

  • Error retrieving Avro schema for id 7
  • Subject not found.; error code: 40401

Что меня беспокоит, так это то, что я определил стратегию RecordNameStrategy, которая, я думаю, должна использовать магический байт, чтобы пойти и получить схему, а не имя темы, но в Subject не найдены ошибки. Я не уверен, что он ищет имя субъекта или получает схему по идентификатору. В любом случае, подключившись к экземпляру connect и выполнив http://schema-registry-service:8081/schemas/ids/7 Мне вернули схему. Над этой трассировкой стека есть дополнительная регистрация, которая, к сожалению, выглядит так, как будто она все еще использует неверную стратегию имен:

INFO AvroConverterConfig values:
    schema.registry.url = [http://schema-registry-service:8081]
    basic.auth.user.info = [hidden]
    auto.register.schemas = false
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    schema.registry.basic.auth.user.info = [hidden]
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

У кого-нибудь есть какие-либо подсказки о том, как решить эту проблему? Я использую следующие изображения:

  • confluentinc / ф-Кафка-подключения: 5.2.0
  • confluentinc / ф-Кафка: 5.1.0

Спасибо

1 Ответ

0 голосов
/ 23 апреля 2019

В трассировке lookUpSubjectVersion означает, что он пытался выполнить поиск под /subjects/:name/versions для каждого идентификатора, указанного там, но не смог найти schemaId=7 (Примечание: не версия = 7), хотяне очень ясно из журналов, что :name он пытается использовать здесь, но если это не найдено, то вы получите ошибку Subject not found.Если бы мой PR был принят, имя субъекта было бы более ясным

Я полагаю, это может быть связано с использованием RecordNameStrategy. Глядя на PR для этого свойства , я понял, что оно действительно было проверено только на коде производителя / потребителя, а не только в Connect API.По сравнению со стандартным поведением TopicNameStrategy

Который, как вы можете видеть, он пытался использовать

value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

Но если присмотреться, я думаю, что вы, возможно, настроили его неправильно.

Аналогично тому, как у вас есть value.converter.schema.registry.url, вам на самом деле нужно будет установить value.converter.value.subject.name.strategy вместо этого.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...