Я пытаюсь записать некоторое десятичное значение как avro, используя python. Код работает нормально без десятичного значения. Если я добавлю десятичное значение, я получу исключение AvroTypeException: элемент {blah} не является примером схемы {blah..blah}. Вот мой python код
#trial with avro library
import avro.schema
import avro.io
import io
from decimal import *
from decimal import Decimal as D
schema = """{"name":"DEPARTMENT_111","type":"record","fields":[{"name":"DEPARTMENT_NAME","type":["null","string"],"default":null},{"name":"DEPARTMENT_ID","type":["null",{"type":"bytes","logicalType":"decimal","precision":38,"scale":10}]},{"name":"ETL_BATCH_SK","type":["null","long"],"default":null},{"name":"INSERT_TS","type":["null","string"],"default":null},{"name":"OP_CODE","type":["null","string"],"default":null},{"name":"PROCESSED_FLAG","type":["null","string"],"default":null}]}"""
print(format_json(json.loads(schema)))
parsed_schema = avro.schema.Parse(schema)
writer = avro.io.DatumWriter(parsed_schema)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
class DecimalEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, D):
return float(obj)
return json.JSONEncoder.default(self, obj)
sample_department_data = {
"DEPARTMENT_NAME":"Physics",
"DEPARTMENT_ID" : decimal.Decimal("201.0"),
"ETL_BATCH_SK" : 952879684,
"OP_CODE":"I",
"PROCESSED_FLAG":"False"
}
sample_department_json = json.dumps(sample_department_data, cls=DecimalEncoder)
writer.write(sample_department_json, encoder)
Однако я получаю следующую ошибку:
---------------------------------------------------------------------------
AvroTypeException Traceback (most recent call last)
<ipython-input-42-d78ba6b385e2> in <module>()
47 }
48 sample_department_json = json.dumps(sample_department_data, cls=DecimalEncoder)
---> 49 writer.write(sample_department_json, encoder)
50
51 raw_bytes = bytes_writer.getvalue()
~/.pyenv/versions/3.6.0/lib/python3.6/site-packages/avro/io.py in write(self, datum, encoder)
815 # validate datum
816 if not Validate(self.writer_schema, datum):
--> 817 raise AvroTypeException(self.writer_schema, datum)
818
819 self.write_data(self.writer_schema, datum, encoder)
AvroTypeException: The datum {"DEPARTMENT_NAME": "Physics", "DEPARTMENT_ID": 201.0, "ETL_BATCH_SK": 952879684, "OP_CODE": "I", "PROCESSED_FLAG": "False"} is not an example of the schema {
"type": "record",
"name": "DEPARTMENT_111",
"fields": [
{
"type": [
"null",
"string"
],
"name": "DEPARTMENT_NAME",
"default": null
},
{
"type": [
"null",
{
"type": "bytes",
"logicalType": "decimal",
"precision": 38,
"scale": 10
}
],
"name": "DEPARTMENT_ID"
},
{
"type": [
"null",
"long"
],
"name": "ETL_BATCH_SK",
"default": null
},
{
"type": [
"null",
"string"
],
"name": "INSERT_TS",
"default": null
},
{
"type": [
"null",
"string"
],
"name": "OP_CODE",
"default": null
},
{
"type": [
"null",
"string"
],
"name": "PROCESSED_FLAG",
"default": null
}
]
}
Что я делаю не так?