Соединитель раковины Kafka не работает: схема не найдена; код ошибки: 40403 - PullRequest
0 голосов
/ 17 января 2019

У меня есть соединитель раковины со следующей конфигурацией

{
    "name": "sink-test-mariadb-MY_TOPIC",
    "config": { 
                "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
                "tasks.max":"10",
                "topics":"MY_TOPIC",
                "connection.url":"jdbc:mariadb://localhost:3306/myschema?user=myuser&password=mypass",
                "auto.create":"false",
                "auto.evolve":"true",
                "table.name.format":"MY_TABLE",
                "pk.mode":"record_value",
                "pk.fields":"ID",
                "insert.mode":"upsert",
                "transforms":"ExtractField",
                "transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
                "transforms.ExtractField.field":"data"
        }
}

, и через некоторое время все задачи соединителя завершаются со следующей ошибкой:

{
    "state": "FAILED",
    "trace": "org.apache.kafka.connect.errors.DataException: MY_TOPIC
                at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:95)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
                at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
                at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:748)
            Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 802
            Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
                at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:202)
                at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
                at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:409)
                at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:402)
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:119)
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:192)
                at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:168)
                at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
                at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
                at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:120)
                at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:83)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
                at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
                at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:748)",
    "id": 0,
    "worker_id": "localhost:8083"
}

Соединитель управляетсинхронизировать тему с базой данных, но внезапно происходит сбой без какой-либо причины.Я также очень уверен, что схема есть.Его тема появляется в списке, возвращаемом вызовом API реестра схемы localhost:8081/subjects

[
  ...
  MY_TOPIC-value
  ...
]

1 Ответ

0 голосов
/ 17 января 2019

Сообщение по теме Kafka сериализуется с другой версией схемы, чем та, что у вас есть в реестре схем. Возможно, это было сгенерировано инструментом, который записал схему в другой реестр схем или в другую среду? Чтобы иметь возможность десериализовать его, Kafka Connect должен иметь возможность получить идентификатор схемы, который находится в магическом байте в начале сообщения Kafka по теме.

Схема отсутствует в вашем реестре схем, как видно по:

GET /schemas/ids/803
 { "error_code": 40403, "message": "Schema not found" }

Вы можете проверить идентификатор схемы, которую вы делаете , посмотрев на

curl -s "http://localhost:8081/subjects/MY_TOPIC-value/versions/3/"|jq '.id'
...