Использование SchemaRegistryClient для десериализации сообщения AVRO в Python - PullRequest
0 голосов
/ 01 августа 2020

Мы пытаемся использовать сообщения AVRO, поступающие из других систем. Я могу прочитать сообщение AVRO, когда я указываю схему как файл (.avs c), используя приведенный ниже код,

import avro.schema
from avro.io import DatumReader, BinaryDecoder
...
schema = avro.schema.Parse(open("schema.avsc", "rb").read())
...
bytes_reader = io.BytesIO(element) # element is the serialized message
decoder = BinaryDecoder(bytes_reader)
reader = DatumReader(schema)
rec = reader.read(decoder)

Однако теперь мне нужно прочитать схему из реестра схем. Вместо этого URL-адрес

http://<IP>:<PORT>/subjects/<SUBJECT>/versions/<VERSION>/schema

Я извлекаю URL-адрес из настраиваемого атрибута входящего сообщения 'schema'. Теперь, чтобы получить схему из URL-адреса, я использую приведенный ниже код:

def fetch_schema(IP, subject, version):
    sr = SchemaRegistryClient(IP)
    schema = sr.get_schema(subject, version=version).schema
    return schema

С тем же кодом, который использовался выше для десериализации сообщения, теперь я получаю следующую ошибку:

AttributeError: 'AvroSchema' object has no attribute 'type' 

на строка,

rec = reader.read(decoder) 

Я сравнил тип переменной 'schema', когда я читаю из файла, и когда я извлекаю из URL,

from file, the schema type is : <class 'avro.schema.RecordSchema'>
from URL, the schema type is : <class 'schema_registry.client.schema.AvroSchema'>

Они разные и отсюда наверное проблема. Ищу какое-то направление здесь. Спасибо!

...