Мы пытаемся использовать сообщения AVRO, поступающие из других систем. Я могу прочитать сообщение AVRO, когда я указываю схему как файл (.avs c), используя приведенный ниже код,
import avro.schema
from avro.io import DatumReader, BinaryDecoder
...
schema = avro.schema.Parse(open("schema.avsc", "rb").read())
...
bytes_reader = io.BytesIO(element) # element is the serialized message
decoder = BinaryDecoder(bytes_reader)
reader = DatumReader(schema)
rec = reader.read(decoder)
Однако теперь мне нужно прочитать схему из реестра схем. Вместо этого URL-адрес
http://<IP>:<PORT>/subjects/<SUBJECT>/versions/<VERSION>/schema
Я извлекаю URL-адрес из настраиваемого атрибута входящего сообщения 'schema'. Теперь, чтобы получить схему из URL-адреса, я использую приведенный ниже код:
def fetch_schema(IP, subject, version):
sr = SchemaRegistryClient(IP)
schema = sr.get_schema(subject, version=version).schema
return schema
С тем же кодом, который использовался выше для десериализации сообщения, теперь я получаю следующую ошибку:
AttributeError: 'AvroSchema' object has no attribute 'type'
на строка,
rec = reader.read(decoder)
Я сравнил тип переменной 'schema', когда я читаю из файла, и когда я извлекаю из URL,
from file, the schema type is : <class 'avro.schema.RecordSchema'>
from URL, the schema type is : <class 'schema_registry.client.schema.AvroSchema'>
Они разные и отсюда наверное проблема. Ищу какое-то направление здесь. Спасибо!