Confluent Python Avro Producer: датум {'..'} не является примером схемы - PullRequest
0 голосов
/ 15 октября 2019

Я не могу создать данные для конкретной схемы и не могу понять, почему. Данные примера, включенные в код в качестве словаря, были созданы непосредственно с использованием конфлюэнтного «avro-random-generator», поэтому данные примера должны быть правильными, поскольку они напрямую получены из схемы. И реестр схемы, и генератор случайных чисел Avro являются инструментами слияния, поэтому их инструменты не могут создавать примеры данных, которые не работают с реестром их схемы.

Это схема:

{
  "type": "record",
  "name": "schemaV1",
  "namespace": "net.avro.schemaV1",
  "doc": "",
  "fields": [
    {
      "name": "orderId",
      "type": {
        "type": "string",
        "avro.java.string": "String"
      },
      "doc": ""
    },
    {
      "name": "offerId",
      "type": {
        "type": "string",
        "avro.java.string": "String"
      },
      "doc": ""
    },
    {
      "name": "redeemerId",
      "type": [
        "null",
        {
          "type": "string",
          "avro.java.string": "String"
        }
      ],
      "doc": "",
      "default": null
    },
    {
      "name": "eventCancellationType",
      "type": "int",
      "doc": ""
    },
    {
      "name": "ruleIds",
      "type": {
        "type": "array",
        "items": {
          "type": "string",
          "avro.java.string": "String"
        },
        "doc": ""
      }
    },
    {
      "name": "eventOriginator",
      "type": {
        "type": "record",
        "name": "AvroEventPartnerV1",
        "doc": "",
        "fields": [
          {
            "name": "partnerShortName",
            "type": {
              "type": "string",
              "avro.java.string": "String"
            },
            "doc": ""
          },
          {
            "name": "businessUnitShortName",
            "type": [
              "null",
              {
                "type": "string",
                "avro.java.string": "String"
              }
            ],
            "doc": "",
            "default": null
          },
          {
            "name": "branchShortName",
            "type": [
              "null",
              {
                "type": "string",
                "avro.java.string": "String"
              }
            ],
            "doc": "",
            "default": null
          }
        ]
      }
    },
    {
      "name": "roundedDelta",
      "doc": "",
      "type": {
        "type": "record",
        "name": "AvroAmountV1",
        "doc": "Amount with a currency",
        "fields": [
          {
            "name": "amount",
            "type": {
              "type": "bytes",
              "logicalType": "decimal",
              "precision": 21,
              "scale": 3
            },
            "doc": "The amount as a decimal number"
          },
          {
            "name": "currency",
            "type": {
              "type": "string",
              "avro.java.string": "String"
            },
            "doc": ""
          }
        ]
      }
    },
    {
      "name": "rewardableLegalDelta",
      "type": [
        "null",
        "AvroAmountV1"
      ],
      "doc": "",
      "default": null
    },
    {
      "name": "receiptNumber",
      "type": {
        "type": "string",
        "avro.java.string": "String"
      },
      "doc": ""
    },
    {
      "name": "referenceReceiptNumber",
      "type": [
        "null",
        {
          "type": "string",
          "avro.java.string": "String"
        }
      ],
      "doc": "",
      "default": null
    },
    {
      "name": "eventEffectiveTime",
      "type": {
        "type": "long"
      },
      "doc": ""
    }
  ]
}

Это мой сценарий:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-

    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer, ClientError, ValueSerializerError

    BOOTSTRAP_SERVER = 'localhost:9092'
    SCHEMA_REGISTRY = 'http://localhost:8081'
    TOPIC = 'topicV1'
    SCHEMA_PATH = 'schemas/schemaV1.avsc'


    def schemaReader(SCHEMA_PATH):
        with open(SCHEMA_PATH, 'r') as file:
            data = file.read()

        return data


    def main():
        kafka_config = {
            'bootstrap.servers': BOOTSTRAP_SERVER,
            'schema.registry.url': SCHEMA_REGISTRY
        }

        value_schema = avro.loads( schemaReader(SCHEMA_PATH) )


        null = None

        value = {
      "orderId": "a9bcc55f-e2c0-43d6-b793-ff5f295d051d",
      "offerId": "119475017578242889",
      "redeemerId": "1176a01b-b2dc-45a9-91cc-232361e14f99",
      "eventCancellationType": 0,
      "ruleIds": ["ID-IPM00001"],
      "eventOriginator": {"partnerShortName": 
      "partner","businessUnitShortName": null,"branchShortName": null},
      "roundedDelta": {"amount": "\u8463","currency": "PTS"},
      "rewardableLegalDelta": {"amount": "\u8463","currency": "EUR"},
      "receiptNumber": "19b2ff68-ed06-48f0-9ce9-d697c0eadc19",
      "referenceReceiptNumber": null,
      "eventEffectiveTime": 1569494696656
     }


        avroProducer = AvroProducer(kafka_config, default_value_schema=value_schema )
        avroProducer.produce(topic=TOPIC, value=value, value_schema=value_schema)
        avroProducer.flush()

    if __name__== "__main__":
        main()

Это трассировка, которую я получаю:

  File "producer.py", line 64, in <module>
    main()
  File "producer.py", line 60, in main
    avroProducer.produce(topic=TOPIC, value=value, value_schema=value_schema)
  File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/__init__.py", line 80, in produce
    value = self._serializer.encode_record_with_schema(topic, value_schema, value)
  File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 115, in encode_record_with_schema
    return self.encode_record_with_schema_id(schema_id, record, is_key=is_key)
  File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 149, in encode_record_with_schema_id
    writer(record, outf)
  File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 86, in <lambda>
    return lambda record, fp: writer.write(record, avro.io.BinaryEncoder(fp))
  File "/apps/python/python2.7/lib/python2.7/site-packages/avro/io.py", line 1042, in write
    raise AvroTypeException(self.writers_schema, datum)

avro.io.AvroTypeException: The datum {'..'} is not an example of the schema { ..}

1 Ответ

0 голосов
/ 08 ноября 2019

Кажется, проблема в том, что amount должен быть байтового типа, но у вас есть нормальная строка \u8463. Библиотека, которую вы упомянули, что вы использовали для генерации случайных данных, создает байтовую строку, используя кодировку java по умолчанию: https://github.com/confluentinc/avro-random-generator/blob/master/src/main/java/io/confluent/avro/random/generator/Generator.java#L373

Однако, возможно, по умолчанию это не iso-8859-1, как в javaреализация (эталонная реализация) использует: https://github.com/apache/avro/blob/bf47ec97e0b7f5701042fac067b73b421a9177b7/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java#L220

...