Я написал скрипт для преобразования диапазона файлов avro с различными схемами из байтов логического типа, десятичного логического типа в двойной.Во всех схемах я изменяю:
'type': {'connect.name': 'org.apache.kafka.connect.data.Decimal',
'precision': 20,
'connect.parameters': {'scale': '8',
'connect.decimal.precision': '20'},
'scale': 8,
'logicalType': 'decimal',
'type': 'bytes',
'connect.version': 1}}]}
на просто:
'type':'double'
И в остальном сохраняю схему неизменной.
Затем я использую следующий код:
with open(input_file, 'rb') as f_in:
records = list(fastavro.reader(f_in))
#Reserialize and save locally
with open(output_file, 'wb') as out:
fastavro.writer(out, fixed_schema, records)
Для десериализации и повторной сериализации файлов Avro (фиксированная схема идентична схеме, за исключением ранее упомянутой корректировки с байтов на удвоение.
Для 3 схем это работает отлично - fastavroправильно читает записи типа байтов как десятичные числа, которые он может записать в тип double. В четвертом случае я получаю следующую ошибку. Я публикую ниже схемы (до и после).
ValueError: {'NSG': {'apr': Decimal('0.08325675'), 'interestRate':
Decimal('0.05080000'), 'fee': Decimal('950.00000000'), 'monthlyPayment':
Decimal('378.57427442')}, 'SG': {'apr': Decimal('0.08325675'), '
'interestRate': Decimal('0.05080000'), 'fee': Decimal('950.00000000'),
'monthlyPayment': Decimal('378.57427442')}} (type <class 'dict'>) do not
match ['null', {'values': {'type': 'record', 'connect.name':
'com.zopa.events.origination.quotation.Price', '__fastavro_parsed': True,
'name': 'com.zopa.events.origination.quotation.Price', 'fields': [{'type':
'double', 'name': 'apr'}, {'type': 'double', 'name': 'fee'}, {'type':
'double', 'name': 'interestRate'}, {'type': 'double', 'name':
'monthlyPayment'}]}, 'type': 'map', 'connect.parameters':
{'avro.java.string': 'String'}, 'avro.java.string': 'String'}]
Что здесь не так? В этом примере, если я обновляю схему для quotes.amount, fastavro может писать без проблем, но если я также обновляю для quotes.prices.sg.apr, я получаю сообщение об ошибке ниже, дажехотя раньше поля были одного и того же типа. Вы сможете повторить это с одной десериализованной строкой, опубликованной ниже. какие изменения мне нужно внести в схему иecords для создания нового файла avro ?
Схема до:
{'connect.name': 'com.xxxxx.events.origination.quotation.QuotesCreated',
'fields': [{'doc': 'This file represents a standardised Xxxxx header that\r\n would be expected on each message we emit to Kafka.\r\n\r\n Check out this link for the human definition and reasoning\r\n https://github.com/xxxxxUK/rfc/blob/master/correlating_requests.md',
'name': 'header',
'type': {'connect.name': 'com.xxxxx.events.Header',
'fields': [{'name': 'recordId',
'type': {'avro.java.string': 'String',
'type': 'string',
'connect.parameters': {'avro.java.string': 'String'}}},
{'default': [],
'name': 'causationIds',
'type': {'type': 'array',
'items': {'avro.java.string': 'String',
'type': 'string',
'connect.parameters': {'avro.java.string': 'String'}},
'connect.default': []}},
{'name': 'correlationId',
'type': {'avro.java.string': 'String',
'type': 'string',
'connect.parameters': {'avro.java.string': 'String'}}},
{'name': 'eventTime',
'type': {'connect.name': 'org.apache.kafka.connect.data.Timestamp',
'logicalType': 'timestamp-millis',
'type': 'long',
'connect.version': 1}},
{'default': {},
'name': 'otherMetadata',
'type': {'values': {'avro.java.string': 'String',
'type': 'string',
'connect.parameters': {'avro.java.string': 'String'}},
'avro.java.string': 'String',
'type': 'map',
'connect.default': {},
'connect.parameters': {'avro.java.string': 'String'}}}],
'connect.parameters': {'connect.record.doc': 'This file represents a standardised Xxxxx header that\r\n would be expected on each message we emit to Kafka.\r\n\r\n Check out this link for the human definition and reasoning\r\n https://github.com/xxxxxUK/rfc/blob/master/correlating_requests.md'},
'namespace': 'com.xxxxx.events',
'doc': 'This file represents a standardised Xxxxx header that\r\n would be expected on each message we emit to Kafka.\r\n\r\n Check out this link for the human definition and reasoning\r\n https://github.com/xxxxxUK/rfc/blob/master/correlating_requests.md',
'name': 'Header',
'type': 'record',
'connect.doc': 'This file represents a standardised Xxxxx header that\r\n would be expected on each message we emit to Kafka.\r\n\r\n Check out this link for the human definition and reasoning\r\n https://github.com/xxxxxUK/rfc/blob/master/correlating_requests.md'}},
{'name': 'channelCode',
'type': {'avro.java.string': 'String',
'type': 'string',
'connect.parameters': {'avro.java.string': 'String'}}},
{'doc': 'ID of a borrower application, will be reused across various business flows',
'name': 'applicationId',
'type': {'connect.doc': 'ID of a borrower application, will be reused across various business flows',
'avro.java.string': 'String',
'type': 'string',
'connect.parameters': {'avro.java.string': 'String'}}},
{'doc': 'ID for a borrower quote request, will be new between retries in a single borrower application',
'name': 'requestId',
'type': {'connect.doc': 'ID for a borrower quote request, will be new between retries in a single borrower application',
'avro.java.string': 'String',
'type': 'string',
'connect.parameters': {'avro.java.string': 'String'}}},
{'doc': 'List of quotes priced for this borrower application',
'name': 'quotes',
'type': {'connect.doc': 'List of quotes priced for this borrower application',
'type': 'array',
'items': {'connect.name': 'com.xxxxx.events.origination.quotation.PricedQuote',
'name': 'PricedQuote',
'type': 'record',
'fields': [{'name': 'id',
'type': {'avro.java.string': 'String',
'type': 'string',
'connect.parameters': {'avro.java.string': 'String'}}},
{'name': 'amount',
'type': {'connect.name': 'org.apache.kafka.connect.data.Decimal',
'precision': 20,
'connect.parameters': {'scale': '8',
'connect.decimal.precision': '20'},
'scale': 8,
'logicalType': 'decimal',
'type': 'bytes',
'connect.version': 1}},
{'name': 'term', 'type': 'int'},
{'default': False,
'name': 'isRequestedQuote',
'type': {'type': 'boolean', 'connect.default': False}},
{'default': None,
'name': 'riskScore',
'type': ['null',
{'connect.name': 'com.xxxxx.events.origination.quotation.RiskScore',
'name': 'RiskScore',
'type': 'record',
'fields': [{'name': 'score', 'type': 'float'},
{'default': 'UNKNOWN',
'name': 'riskMarket',
'type': {'connect.name': 'com.xxxxx.events.origination.decisioning.RiskMarket',
'type': 'string',
'connect.default': 'UNKNOWN',
'connect.parameters': {'io.confluent.connect.avro.Enum.ASTAR': 'ASTAR',
'io.confluent.connect.avro.Enum.A1': 'A1',
'io.confluent.connect.avro.Enum': 'com.xxxxx.events.origination.decisioning.RiskMarket',
'io.confluent.connect.avro.Enum.D': 'D',
'io.confluent.connect.avro.Enum.A2': 'A2',
'io.confluent.connect.avro.Enum.UNKNOWN': 'UNKNOWN',
'connect.enum.doc': None,
'io.confluent.connect.avro.Enum.N': 'N',
'io.confluent.connect.avro.Enum.B': 'B',
'io.confluent.connect.avro.Enum.E': 'E',
'io.confluent.connect.avro.Enum.C1': 'C1'}}}]}]},
{'default': None,
'name': 'affordability',
'type': ['null',
{'connect.name': 'com.xxxxx.events.origination.quotation.Affordability',
'name': 'Affordability',
'type': 'record',
'fields': [{'name': 'DI', 'type': 'float'},
{'name': 'DTI', 'type': 'float'},
{'name': 'HDTI', 'type': 'float'},
{'name': 'LDTI', 'type': 'float'}]}]},
{'default': None,
'name': 'prices',
'type': ['null',
{'values': {'connect.name': 'com.xxxxx.events.origination.quotation.Price',
'name': 'Price',
'type': 'record',
'fields': [{'name': 'apr',
'type': {'connect.name': 'org.apache.kafka.connect.data.Decimal',
'precision': 20,
'connect.parameters': {'scale': '8',
'connect.decimal.precision': '20'},
'scale': 8,
'logicalType': 'decimal',
'type': 'bytes',
'connect.version': 1}},
{'name': 'fee',
'type': {'connect.name': 'org.apache.kafka.connect.data.Decimal',
'precision': 20,
'connect.parameters': {'scale': '8',
'connect.decimal.precision': '20'},
'scale': 8,
'logicalType': 'decimal',
'type': 'bytes',
'connect.version': 1}},
{'name': 'interestRate',
'type': {'connect.name': 'org.apache.kafka.connect.data.Decimal',
'precision': 20,
'connect.parameters': {'scale': '8',
'connect.decimal.precision': '20'},
'scale': 8,
'logicalType': 'decimal',
'type': 'bytes',
'connect.version': 1}},
{'name': 'monthlyPayment',
'type': {'connect.name': 'org.apache.kafka.connect.data.Decimal',
'precision': 20,
'connect.parameters': {'scale': '8',
'connect.decimal.precision': '20'},
'scale': 8,
'logicalType': 'decimal',
'type': 'bytes',
'connect.version': 1}}]},
'avro.java.string': 'String',
'type': 'map',
'connect.parameters': {'avro.java.string': 'String'}}]}]}}}],
'namespace': 'com.xxxxx.events.origination.quotation',
'name': 'QuotesCreated',
'type': 'record',
'connect.version': 3}
Схема после:
{'connect.name': 'com.xxxxx.events.origination.quotation.QuotesCreated',
'fields': [{'doc': 'This file represents a standardised Xxxxx header that\r\n would be expected on each message we emit to Kafka.\r\n\r\n Check out this link for the human definition and reasoning\r\n https://github.com/xxxxxUK/rfc/blob/master/correlating_requests.md',
'name': 'header',
'type': {'connect.name': 'com.xxxxx.events.Header',
'fields': [{'name': 'recordId',
'type': {'avro.java.string': 'String',
'type': 'string',
'connect.parameters': {'avro.java.string': 'String'}}},
{'default': [],
'name': 'causationIds',
'type': {'type': 'array',
'items': {'avro.java.string': 'String',
'type': 'string',
'connect.parameters': {'avro.java.string': 'String'}},
'connect.default': []}},
{'name': 'correlationId',
'type': {'avro.java.string': 'String',
'type': 'string',
'connect.parameters': {'avro.java.string': 'String'}}},
{'name': 'eventTime',
'type': {'connect.name': 'org.apache.kafka.connect.data.Timestamp',
'logicalType': 'timestamp-millis',
'type': 'long',
'connect.version': 1}},
{'default': {},
'name': 'otherMetadata',
'type': {'values': {'avro.java.string': 'String',
'type': 'string',
'connect.parameters': {'avro.java.string': 'String'}},
'avro.java.string': 'String',
'type': 'map',
'connect.default': {},
'connect.parameters': {'avro.java.string': 'String'}}}],
'connect.parameters': {'connect.record.doc': 'This file represents a standardised Xxxxx header that\r\n would be expected on each message we emit to Kafka.\r\n\r\n Check out this link for the human definition and reasoning\r\n https://github.com/xxxxxUK/rfc/blob/master/correlating_requests.md'},
'namespace': 'com.xxxxx.events',
'doc': 'This file represents a standardised Xxxxx header that\r\n would be expected on each message we emit to Kafka.\r\n\r\n Check out this link for the human definition and reasoning\r\n https://github.com/xxxxxUK/rfc/blob/master/correlating_requests.md',
'name': 'Header',
'type': 'record',
'connect.doc': 'This file represents a standardised Xxxxx header that\r\n would be expected on each message we emit to Kafka.\r\n\r\n Check out this link for the human definition and reasoning\r\n https://github.com/xxxxxUK/rfc/blob/master/correlating_requests.md'}},
{'name': 'channelCode',
'type': {'avro.java.string': 'String',
'type': 'string',
'connect.parameters': {'avro.java.string': 'String'}}},
{'doc': 'ID of a borrower application, will be reused across various business flows',
'name': 'applicationId',
'type': {'connect.doc': 'ID of a borrower application, will be reused across various business flows',
'avro.java.string': 'String',
'type': 'string',
'connect.parameters': {'avro.java.string': 'String'}}},
{'doc': 'ID for a borrower quote request, will be new between retries in a single borrower application',
'name': 'requestId',
'type': {'connect.doc': 'ID for a borrower quote request, will be new between retries in a single borrower application',
'avro.java.string': 'String',
'type': 'string',
'connect.parameters': {'avro.java.string': 'String'}}},
{'doc': 'List of quotes priced for this borrower application',
'name': 'quotes',
'type': {'connect.doc': 'List of quotes priced for this borrower application',
'type': 'array',
'items': {'connect.name': 'com.xxxxx.events.origination.quotation.PricedQuote',
'name': 'PricedQuote',
'type': 'record',
'fields': [{'name': 'id',
'type': {'avro.java.string': 'String',
'type': 'string',
'connect.parameters': {'avro.java.string': 'String'}}},
{'name': 'amount', 'type': 'double'},
{'name': 'term', 'type': 'int'},
{'default': False,
'name': 'isRequestedQuote',
'type': {'type': 'boolean', 'connect.default': False}},
{'default': None,
'name': 'riskScore',
'type': ['null',
{'connect.name': 'com.xxxxx.events.origination.quotation.RiskScore',
'name': 'RiskScore',
'type': 'record',
'fields': [{'name': 'score', 'type': 'float'},
{'default': 'UNKNOWN',
'name': 'riskMarket',
'type': {'connect.name': 'com.xxxxx.events.origination.decisioning.RiskMarket',
'type': 'string',
'connect.default': 'UNKNOWN',
'connect.parameters': {'io.confluent.connect.avro.Enum.ASTAR': 'ASTAR',
'io.confluent.connect.avro.Enum.A1': 'A1',
'io.confluent.connect.avro.Enum': 'com.xxxxx.events.origination.decisioning.RiskMarket',
'io.confluent.connect.avro.Enum.D': 'D',
'io.confluent.connect.avro.Enum.A2': 'A2',
'io.confluent.connect.avro.Enum.UNKNOWN': 'UNKNOWN',
'connect.enum.doc': None,
'io.confluent.connect.avro.Enum.N': 'N',
'io.confluent.connect.avro.Enum.B': 'B',
'io.confluent.connect.avro.Enum.E': 'E',
'io.confluent.connect.avro.Enum.C1': 'C1'}}}]}]},
{'default': None,
'name': 'affordability',
'type': ['null',
{'connect.name': 'com.xxxxx.events.origination.quotation.Affordability',
'name': 'Affordability',
'type': 'record',
'fields': [{'name': 'DI', 'type': 'float'},
{'name': 'DTI', 'type': 'float'},
{'name': 'HDTI', 'type': 'float'},
{'name': 'LDTI', 'type': 'float'}]}]},
{'default': None,
'name': 'prices',
'type': ['null',
{'values': {'connect.name': 'com.xxxxx.events.origination.quotation.Price',
'name': 'Price',
'type': 'record',
'fields': [{'name': 'apr', 'type': 'double'},
{'name': 'fee', 'type': 'double'},
{'name': 'interestRate', 'type': 'double'},
{'name': 'monthlyPayment', 'type': 'double'}]},
'avro.java.string': 'String',
'type': 'map',
'connect.parameters': {'avro.java.string': 'String'}}]}]}}}],
'namespace': 'com.xxxxx.events.origination.quotation',
'name': 'QuotesCreated',
'type': 'record',
'connect.version': 3}
Одна десериализованная запись:
[{'applicationId': 'aa1b6599-5b6f-e811-80c8-005056992ece',
'channelCode': 'referrer-ppc',
'header': {'causationIds': ['3904682c-144c-423e-bd69-1a717794ca72',
'1adb1985-244a-415a-b105-97e0d13f2dee',
'b5461967-fc6f-407f-ba82-f6a2a025a012',
'cf490394-aaf6-4abd-a265-e116339b36cb',
'7ad758c6-2faf-476b-9902-088cb549819c'],
'correlationId': '34d2c917-e368-441e-bac1-fb0512971181',
'eventTime': datetime.datetime(2018, 6, 13, 22, 46, 31, 204000, tzinfo=<fastavro._timezone.UTCTzinfo object at 0x7f6f53860f60>),
'otherMetadata': {},
'recordId': '0d2d603c-0c46-4cf4-92d2-3114a6bbc3eb'},
'quotes': [{'affordability': {'DI': 1772.1600341796875,
'DTI': 48.439998626708984,
'HDTI': 60.61000061035156,
'LDTI': 15.40999984741211},
'amount': Decimal('15500.00000000'),
'id': 'cd147e61-af7b-40d9-85b2-d3d0e70c0ed7',
'isRequestedQuote': False,
'prices': {'NSG': {'apr': Decimal('0.08325675'),
'fee': Decimal('950.00000000'),
'interestRate': Decimal('0.05080000'),
'monthlyPayment': Decimal('378.57427442')},
'SG': {'apr': Decimal('0.08325675'),
'fee': Decimal('950.00000000'),
'interestRate': Decimal('0.05080000'),
'monthlyPayment': Decimal('378.57427442')}},
'riskScore': {'riskMarket': 'A1', 'score': 0.9677764773368835},
'term': 48},
{'affordability': {'DI': 2036.739990234375,
'DTI': 10.9399995803833,
'HDTI': 47.9900016784668,
'LDTI': 2.7799999713897705},
'amount': Decimal('3500.00000000'),
'id': 'cbe92ade-529b-480c-9a2b-7a41c86cfb24',
'isRequestedQuote': False,
'prices': {'NSG': {'apr': Decimal('0.09938296'),
'fee': Decimal('90.00000000'),
'interestRate': Decimal('0.08760000'),
'monthlyPayment': Decimal('73.52774708')},
'SG': {'apr': Decimal('0.09938296'),
'fee': Decimal('90.00000000'),
'interestRate': Decimal('0.08760000'),
'monthlyPayment': Decimal('73.52774708')}},
'riskScore': {'riskMarket': 'A2', 'score': 0.9523985981941223},
'term': 60}],
'requestId': 'ac1b6599-5b6f-e811-80c8-005056992ece'}
]