Добавить схему в реестр схем с указанным идентификатором - PullRequest
0 голосов
/ 21 марта 2019

Мы уже более года используем реестр Confluent Schema с KafkaStreams, и все работает хорошо;до вчерашнего дня.

В среде UAT мы, кажется, удалили тему Schema, и одно из наших приложений начало сбой с сообщением

[ERROR] LogAndFailExceptionHandler - исключение, обнаруженное во времяДесериализация, taskId: 0_13, тема: TOPIC_NAME, раздел: 13, смещение: 0 org.apache.kafka.common.errors.SerializationException: Ошибка при получении схемы Avro для идентификатора 1531

Я проверил реестр схеми заметил, что тема отсутствует, и запрашивает идентификатор 1531, который указан в ошибке с curl, такой как:

curl -X GET http://SchemaRegistryHost:8081/schemas/ids/1531

И получил обратно:

{"error_code":40403,"message":"Schema not found"}

Я наивно только что попытался зарегистрироватьсяСхема снова, не задумываясь об этом, и это сработало, но идентификатор, с которым была зарегистрирована схема, отличался от предыдущего идентификатора 1531.

Мне нужна схема, зарегистрированная для идентификатора 1531, поскольку существующие сообщения в темеуже содержит этот идентификатор 1531 в волшебный байт.

Я проверил документы API на https://docs.confluent.io/current/schema-registry/docs/develop/api.html, но не сделалничего не нужно для установки заданного идентификатора для схемы.

Есть ли способ принудительно применить схему к определенному идентификатору с реестром схемы?

Мне известны некоторые решения для резервного копирования, но я ищуисправление, которое, мы надеемся, предотвратит потерю данных или чрезвычайные меры по исправлению данных темы.

1 Ответ

0 голосов
/ 21 марта 2019

Есть ли способ принудительно применить схему к определенному идентификатору с реестром схемы?

Нет.


Идентификатор 1531 на самом деле не "пропал", кстати, он просто помечен как удаленный в реестре (используйте тему _schemas, чтобы увидеть его).


На самом деле нет способа обойти ошибку, о которой я знаю, когда вы используете KafkaAvroDeserializer.Вам нужно будет использовать ByteArrayDeserializer, затем «исправить» или «найти» правильный идентификатор с помощью клиента реестра схемы, а затем десериализовать оставшуюся часть сообщения.

Другим вариантом будет сброс вашей группы потребителей, чтобы вы полностью пропустили эти сообщения, или настройку обработки исключений. Обработка плохих сообщений с помощью Kafka Streams API

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...