Как найти идентификатор схемы из реестра схем, используемого для записей avro, при чтении из потребителя kafka - PullRequest
2 голосов
/ 06 мая 2020

Мы используем реестр схем для хранения схем, и сообщения сериализуются в avro и отправляются в темы kafka.

Хотел узнать при чтении данных от потребителя, как найти идентификатор схемы, для которой сериализуется запись avro. Нам необходим этот идентификатор схемы, чтобы отслеживать изменения, добавляется ли новый столбец в таблицу. Если новые столбцы добавляются или удаляются, в реестре схемы будет сгенерирован новый идентификатор схемы, и как получить этот идентификатор у потребителя.

consumer = KafkaConsumer(bootstrap_servers = conf['BOOTSTRAP_SERVERS'],
                        auto_offset_reset = conf['AUTO_OFFSET'],
                        enable_auto_commit = conf['AUTO_COMMIT'],
                        auto_commit_interval_ms = conf['AUTO_COMMIT_INTERVAL']
                        )
consumer.subscribe(conf['KAFKA_TOPICS'])

for message in consumer:
    print(message.key)

Из приведенного выше кода message.key печатает ключ для этого конкретного запись, и как нам найти соответствующий идентификатор схемы, который используется потребителем для десериализации записи .?

curl -X GET http://localhost:8081/subjects/helpkit_internal.helpkit_support.agents-value/versions/2

{"subject":"helpkit_internal.helpkit_support.agents-value","version":2,"id":33,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"helpkit_internal.helpkit_support.agents\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"user_id\"

Здесь от потребителя мы хотели получить значение идентификатора "id":33

Пожалуйста, предложите по этому поводу.

1 Ответ

1 голос
/ 06 мая 2020

Что вы действительно можете сделать, так это получить последний идентификатор схемы для данной темы топа c:

Используя confluent-kafka-python

from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient

sr = CachedSchemaRegistryClient({
    'url': 'http://localhost:8081',
    'ssl.certificate.location': '/path/to/cert',  # optional
    'ssl.key.location': '/path/to/key'  # optional
})

value_schema = sr.get_latest_schema("helpkit_internal.helpkit_support.agents-value")[1]
key_schema= sr.get_latest_schema("helpkit_internal.helpkit_support.agents-key")[1]

Использование SchemaRegistryClient

Получение схемы по имени субъекта

from schema_registry.client import SchemaRegistryClient


sr = SchemaRegistryClient('localhost:8081')
my_schema = sr.get_schema(subject='shelpkit_internal.helpkit_support.agents-value', version='latest')
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...