kafka- python: avro.io.SchemaResolutionException: невозможно получить доступ к индексу ветки 55 для объединения с 2 ветвями - PullRequest
2 голосов
/ 11 июля 2020

Я использую kafka- python 2.0.1 для потребления данных avro. Ниже приведен код, который я пробовал:

from kafka import KafkaConsumer
import avro.schema
from avro.io import DatumReader, BinaryDecoder
import io

schema_path="schema.avsc"
schema = avro.schema.parse(open(schema_path).read())
reader = DatumReader(schema)


consumer = KafkaConsumer(
        bootstrap_servers='xxx.xxx.xxx.xxx:9093',
        security_protocol='SASL_SSL',
        sasl_mechanism = 'GSSAPI',
        auto_offset_reset = 'latest',
        ssl_check_hostname=False,
        api_version=(1,0,0))

consumer.subscribe(['test'])

for message in consumer:
       message_val = message.value
       print(message_val)
       bytes_reader = io.BytesIO(message_val)
       bytes_reader.seek(5)    
       decoder = avro.io.BinaryDecoder(bytes_reader)    
       record = reader.read(decoder)
       print(record)

Я получаю следующую ошибку:

avro.io.SchemaResolutionException: не удается получить доступ к индексу ветки 55 для объединения с 2 ветвями Схема писателя: ["null", "int"] Схема читателя: ["null", "int"]

Кто-нибудь может предложить, что может быть возможной причиной этой ошибки? Я уже следил за этой цепочкой, чтобы пропустить начальные 5 байтов:

Как декодировать / десериализовать Avro с помощью Python из Kafka

1 Ответ

0 голосов
/ 14 июля 2020

У меня все заработало. Проблема заключалась в ссылке на неправильную схему. Спасибо.

...