Проблема записи десятичных данных в формате avro с использованием python - PullRequest
0 голосов
/ 16 апреля 2020

Я пытаюсь записать некоторое десятичное значение как avro, используя python. Код работает нормально без десятичного значения. Если я добавлю десятичное значение, я получу исключение AvroTypeException: элемент {blah} не является примером схемы {blah..blah}. Вот мой python код

#trial with avro library
import avro.schema
import avro.io
import io
from decimal import *
from decimal import Decimal as D

schema = """{"name":"DEPARTMENT_111","type":"record","fields":[{"name":"DEPARTMENT_NAME","type":["null","string"],"default":null},{"name":"DEPARTMENT_ID","type":["null",{"type":"bytes","logicalType":"decimal","precision":38,"scale":10}]},{"name":"ETL_BATCH_SK","type":["null","long"],"default":null},{"name":"INSERT_TS","type":["null","string"],"default":null},{"name":"OP_CODE","type":["null","string"],"default":null},{"name":"PROCESSED_FLAG","type":["null","string"],"default":null}]}"""
print(format_json(json.loads(schema)))
parsed_schema = avro.schema.Parse(schema)


writer = avro.io.DatumWriter(parsed_schema)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)

class DecimalEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, D):
            return float(obj)
        return json.JSONEncoder.default(self, obj)


sample_department_data = {
    "DEPARTMENT_NAME":"Physics",
    "DEPARTMENT_ID" : decimal.Decimal("201.0"),
    "ETL_BATCH_SK" : 952879684,
    "OP_CODE":"I",
    "PROCESSED_FLAG":"False"
}

sample_department_json = json.dumps(sample_department_data, cls=DecimalEncoder)
writer.write(sample_department_json, encoder)

Однако я получаю следующую ошибку:

---------------------------------------------------------------------------
AvroTypeException                         Traceback (most recent call last)
<ipython-input-42-d78ba6b385e2> in <module>()
     47 }
     48 sample_department_json = json.dumps(sample_department_data, cls=DecimalEncoder)
---> 49 writer.write(sample_department_json, encoder)
     50 
     51 raw_bytes = bytes_writer.getvalue()

~/.pyenv/versions/3.6.0/lib/python3.6/site-packages/avro/io.py in write(self, datum, encoder)
    815     # validate datum
    816     if not Validate(self.writer_schema, datum):
--> 817       raise AvroTypeException(self.writer_schema, datum)
    818 
    819     self.write_data(self.writer_schema, datum, encoder)

AvroTypeException: The datum {"DEPARTMENT_NAME": "Physics", "DEPARTMENT_ID": 201.0, "ETL_BATCH_SK": 952879684, "OP_CODE": "I", "PROCESSED_FLAG": "False"} is not an example of the schema {
  "type": "record",
  "name": "DEPARTMENT_111",
  "fields": [
    {
      "type": [
        "null",
        "string"
      ],
      "name": "DEPARTMENT_NAME",
      "default": null
    },
    {
      "type": [
        "null",
        {
          "type": "bytes",
          "logicalType": "decimal",
          "precision": 38,
          "scale": 10
        }
      ],
      "name": "DEPARTMENT_ID"
    },
    {
      "type": [
        "null",
        "long"
      ],
      "name": "ETL_BATCH_SK",
      "default": null
    },
    {
      "type": [
        "null",
        "string"
      ],
      "name": "INSERT_TS",
      "default": null
    },
    {
      "type": [
        "null",
        "string"
      ],
      "name": "OP_CODE",
      "default": null
    },
    {
      "type": [
        "null",
        "string"
      ],
      "name": "PROCESSED_FLAG",
      "default": null
    }
  ]
}

Что я делаю не так?

1 Ответ

0 голосов
/ 17 апреля 2020

Я пытался понять, что может быть не так, но проблем не вижу. На самом деле, если я использую fastavro, он работает просто отлично, как показано здесь:

from decimal import Decimal
import io
import json
import fastavro

schema = """{"name":"DEPARTMENT_111","type":"record","fields":[{"name":"DEPARTMENT_NAME","type":["null","string"],"default":null},{"name":"DEPARTMENT_ID","type":["null",{"type":"bytes","logicalType":"decimal","precision":38,"scale":10}]},{"name":"ETL_BATCH_SK","type":["null","long"],"default":null},{"name":"INSERT_TS","type":["null","string"],"default":null},{"name":"OP_CODE","type":["null","string"],"default":null},{"name":"PROCESSED_FLAG","type":["null","string"],"default":null}]}"""
parsed_schema = fastavro.parse_schema(json.loads(schema))

sample_department_data = {
    "DEPARTMENT_NAME":"Physics",
    "DEPARTMENT_ID" : Decimal("201.0"),
    "ETL_BATCH_SK" : 952879684,
    "OP_CODE":"I",
    "PROCESSED_FLAG":"False"
}

bio = io.BytesIO()
fastavro.writer(bio, parsed_schema, [sample_department_data])

bio.seek(0)
print(list(fastavro.reader(bio)))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...