avro-python3 не обеспечивает эволюцию схемы? - PullRequest
0 голосов
/ 07 мая 2019

Я пытаюсь воссоздать случай эволюции схемы с помощью avro-python3 (обратная совместимость).

У меня есть две схемы:

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

schema_v1 = avro.schema.Parse("""
{
     "type": "record",
     "namespace": "com.example",
     "name": "CustomerV1",
     "fields": [
       { "name": "first_name", "type": "string", "doc": "First Name of Customer" },
       { "name": "last_name", "type": "string", "doc": "Last Name of Customer" },
       { "name": "age", "type": "int", "doc": "Age at the time of registration" },
       { "name": "height", "type": "float", "doc": "Height at the time of registration in cm" },
       { "name": "weight", "type": "float", "doc": "Weight at the time of registration in kg" },
       { "name": "automated_email", "type": "boolean", "default": true, "doc": "Field indicating if the user is enrolled in marketing emails" }
     ]
}
""")

schema_v2 = avro.schema.Parse("""
{
     "type": "record",
     "namespace": "com.example",
     "name": "CustomerV2",
     "fields": [
       { "name": "first_name", "type": "string", "doc": "First Name of Customer" },
       { "name": "last_name", "type": "string", "doc": "Last Name of Customer" },
       { "name": "age", "type": "int", "doc": "Age at the time of registration" },
       { "name": "height", "type": "float", "doc": "Height at the time of registration in cm" },
       { "name": "weight", "type": "float", "doc": "Weight at the time of registration in kg" },
       { "name": "phone_number", "type": ["null", "string"], "default": null, "doc": "optional phone number"},
       { "name": "email", "type": "string", "default": "missing@example.com", "doc": "email address"}
     ]
}
""")

Вторая схема не имеет automated_emailполе, но имеет два дополнительных поля: phone_number и email.

В соответствии с правилами эволюции схемы avro, если я напишу запись avro с schema_v1:

writer = DataFileWriter(open("customer_v1.avro", "wb"), DatumWriter(), schema_v1)
writer.append({
    "first_name": "John",
    "last_name": "Doe",
    "age" : 34, 
    "height": 178.0,
    "weight": 75.0,
    "automated_email": True
})
writer.close()

... я могупрочитайте его с schema_v2, если есть значения по умолчанию для несуществующих полей

reader = DataFileReader(open("customer_v1.avro", "rb"), DatumReader(reader_schema=schema_v2))

for field in reader:
    print(field)

reader.close()

Но я получаю следующую ошибку

SchemaResolutionException: Schemas do not match.

Я знаю, что это работает в Java.Это пример из видео курса.Есть ли способ заставить его работать в Python?

1 Ответ

1 голос
/ 08 мая 2019

fastavro, альтернативная реализация Python, прекрасно справляется с этим.

Код для записи с первой схемой находится здесь:

s1 = {
    "type": "record",
    "namespace": "com.example",
    "name": "CustomerV1",
    "fields": [
        {"name": "first_name", "type": "string", "doc": "First Name of Customer"},
        {"name": "last_name", "type": "string", "doc": "Last Name of Customer"},
        {"name": "age", "type": "int", "doc": "Age at the time of registration"},
        {
            "name": "height",
            "type": "float",
            "doc": "Height at the time of registration in cm",
        },
        {
            "name": "weight",
            "type": "float",
            "doc": "Weight at the time of registration in kg",
        },
        {
            "name": "automated_email",
            "type": "boolean",
            "default": True,
            "doc": "Field indicating if the user is enrolled in marketing emails",
        },
    ],
}

record = {
    "first_name": "John",
    "last_name": "Doe",
    "age": 34,
    "height": 178.0,
    "weight": 75.0,
    "automated_email": True,
}

import fastavro

with open("test.avro", "wb") as fp:
    fastavro.writer(fp, fastavro.parse_schema(s1), [record])

И читать со второй схемой:

s2 = {
    "type": "record",
    "namespace": "com.example",
    "name": "CustomerV2",
    "fields": [
        {"name": "first_name", "type": "string", "doc": "First Name of Customer"},
        {"name": "last_name", "type": "string", "doc": "Last Name of Customer"},
        {"name": "age", "type": "int", "doc": "Age at the time of registration"},
        {
            "name": "height",
            "type": "float",
            "doc": "Height at the time of registration in cm",
        },
        {
            "name": "weight",
            "type": "float",
            "doc": "Weight at the time of registration in kg",
        },
        {
            "name": "phone_number",
            "type": ["null", "string"],
            "default": None,
            "doc": "optional phone number",
        },
        {
            "name": "email",
            "type": "string",
            "default": "missing@example.com",
            "doc": "email address",
        },
    ],
}

import fastavro

with open("test.avro", "rb") as fp:
    for record in fastavro.reader(fp, fastavro.parse_schema(s2)):
        print(record)

Вывод в виде новых полей, как и ожидалось:

{'first_name': 'John', 'last_name': 'Doe', 'age': 34, 'height': 178.0, 'weight': 75.0, 'phone_number': None, 'email': 'missing@example.com'}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...