Я использую kafka- python 2.0.1 для потребления данных avro. Ниже приведен код, который я пробовал:
from kafka import KafkaConsumer
import avro.schema
from avro.io import DatumReader, BinaryDecoder
import io
schema_path="schema.avsc"
schema = avro.schema.parse(open(schema_path).read())
reader = DatumReader(schema)
consumer = KafkaConsumer(
bootstrap_servers='xxx.xxx.xxx.xxx:9093',
security_protocol='SASL_SSL',
sasl_mechanism = 'GSSAPI',
auto_offset_reset = 'latest',
ssl_check_hostname=False,
api_version=(1,0,0))
consumer.subscribe(['test'])
for message in consumer:
message_val = message.value
print(message_val)
bytes_reader = io.BytesIO(message_val)
bytes_reader.seek(5)
decoder = avro.io.BinaryDecoder(bytes_reader)
record = reader.read(decoder)
print(record)
Я получаю следующую ошибку:
avro.io.SchemaResolutionException: не удается получить доступ к индексу ветки 55 для объединения с 2 ветвями Схема писателя: ["null", "int"] Схема читателя: ["null", "int"]
Кто-нибудь может предложить, что может быть возможной причиной этой ошибки? Я уже следил за этой цепочкой, чтобы пропустить начальные 5 байтов:
Как декодировать / десериализовать Avro с помощью Python из Kafka