Обработка вложенных схем AVRO с помощью Python3 - PullRequest
0 голосов
/ 03 марта 2019

Я использую avro1.8.2 + python3.7 (pip install avro-python3) для обработки формата AVRO.

Вот пример кода с веб-сайта AVRO

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

schema = avro.schema.parse(open("user.avsc", "rb").read())

writer = DataFileWriter(open("users.avro", "wb"), DatumWriter(), schema)
writer.append({"name": "Alyssa", "favorite_number": 256})
writer.append({"name": "Ben", "favorite_number": 7, "favorite_color": "red"})
writer.close()

reader = DataFileReader(open("users.avro", "rb"), DatumReader())
for user in reader:
    print user
reader.close()

Этот код не работает, потому что метод parse был переименован в Parse, а второй параметр, необходимый для поддержки вложенной схемы, был удален.

Поэтому вопрос заключается в том, какAVRO для чтения / записи с вложенными схемами в python3?

1 Ответ

0 голосов
/ 03 марта 2019

Прочитав исходный код библиотеки Avro, я нашел способ сделать это.Вот код

import json

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

def create_schema():
    names = avro.schema.Names()
    load = lambda dict_value: avro.schema.SchemaFromJSONData(dict_value, names=names)

    transaction_schema_dict = {
        "namespace": "myavro",
        "type": "record",
        "name": "Transaction",
        "fields": [
            {"name": "name", "type": "string"},
        ]
    }
    account_schema_dict = {
        "namespace": "myavro",
        "type": "record",
        "name": "Account",
        "fields": [
            {"name": "name", "type": "string"},
            {"name": "transaction",  "type": ["null", {'type': 'array', 'items': 'Transaction'}], 'default': "null"},
        ]
    }

    load(transaction_schema_dict)
    return load(account_schema_dict)

def write_avro_file(file_path, schema, data):
    with open(file_path, 'wb') as f, DataFileWriter(f, DatumWriter(), schema) as writer:
        writer.append(data)

def print_avro_file(file_path):
    with open(file_path, 'rb') as f, DataFileReader(f, DatumReader()) as reader:
        for account in reader:
            print(account)

def run():
    schema = create_schema()
    file_path = 'account.avro'
    data = {
        'name': 'my account',
        'transaction': [
            { 'name': 'my transaction 1' },
            { 'name': 'my transaction 2' },
        ]
    }
    write_avro_file(file_path, schema, data)
    print_avro_file(file_path)

run()

Ключ должен использовать функцию SchemaFromJSONData вместо Parse и назначать один и тот же объект Names, чтобы схемы могли ссылаться друг на друга.Обратите внимание, что порядок загрузки вызовов схемы имеет значение.

...