Авро десериализация от Кафки с использованием фаставро - PullRequest
3 голосов
/ 28 апреля 2020

Я создаю приложение, которое получает данные от Кафки. При использовании стандартной библиотеки avro, предоставляемой Apache (https://pypi.org/project/avro-python3/), результаты верны, однако процесс десериализации очень медленный.

class KafkaReceiver:
    data = {}

    def __init__(self, bootstrap='192.168.1.111:9092'):
        self.client = KafkaConsumer(
            'topic',
            bootstrap_servers=bootstrap,
            client_id='app',
            api_version=(0, 10, 1)
        )
        self.schema = avro.schema.parse(open("Schema.avsc", "rb").read())
        self.reader = avro.io.DatumReader(self.schema)

    def do(self):
        for msg in self.client:
            bytes_reader = io.BytesIO(msg.value)
            decoder = BinaryDecoder(bytes_reader)

            self.data = self.reader.read(decoder)

Читая, почему это так медленно, я нашел fastavro, который должен быть намного быстрее. Я использую это следующим образом:

    def do(self):

        schema = fastavro.schema.load_schema('Schema.avsc')
        for msg in self.client:
            bytes_reader = io.BytesIO(msg.value)
            bytes_reader.seek(0)
            for record in reader(bytes_reader, schema):
                self.data = record

И, так как все работает при использовании библиотеки Apache, я ожидаю, что все будет работать так же, как с fastavro. Однако, когда я запускаю это, я получаю

  File "fastavro/_read.pyx", line 389, in fastavro._read.read_map
  File "fastavro/_read.pyx", line 290, in fastavro._read.read_utf8
  File "fastavro/_six.pyx", line 22, in fastavro._six.py3_btou
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xfc in position 3: invalid start byte

Я обычно не программирую на Python, поэтому я точно не знаю, как к этому подойти. Есть идеи?

1 Ответ

0 голосов
/ 29 апреля 2020

fastavro.reader ожидает формат файла avro, который включает заголовок. Похоже, что у вас есть сериализованная запись без заголовка. Я думаю, что вы могли бы прочитать это, используя fastavro.schemaless_reader.

Так что вместо:

for record in reader(bytes_reader, schema):
    self.data = record

Вы бы сделали:

self.data = schemaless_reader(bytes_reader, schema)
...