Я не могу создать данные для конкретной схемы и не могу понять, почему. Данные примера, включенные в код в качестве словаря, были созданы непосредственно с использованием конфлюэнтного «avro-random-generator», поэтому данные примера должны быть правильными, поскольку они напрямую получены из схемы. И реестр схемы, и генератор случайных чисел Avro являются инструментами слияния, поэтому их инструменты не могут создавать примеры данных, которые не работают с реестром их схемы.
Это схема:
{
"type": "record",
"name": "schemaV1",
"namespace": "net.avro.schemaV1",
"doc": "",
"fields": [
{
"name": "orderId",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
},
{
"name": "offerId",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
},
{
"name": "redeemerId",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"doc": "",
"default": null
},
{
"name": "eventCancellationType",
"type": "int",
"doc": ""
},
{
"name": "ruleIds",
"type": {
"type": "array",
"items": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
}
},
{
"name": "eventOriginator",
"type": {
"type": "record",
"name": "AvroEventPartnerV1",
"doc": "",
"fields": [
{
"name": "partnerShortName",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
},
{
"name": "businessUnitShortName",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"doc": "",
"default": null
},
{
"name": "branchShortName",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"doc": "",
"default": null
}
]
}
},
{
"name": "roundedDelta",
"doc": "",
"type": {
"type": "record",
"name": "AvroAmountV1",
"doc": "Amount with a currency",
"fields": [
{
"name": "amount",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 21,
"scale": 3
},
"doc": "The amount as a decimal number"
},
{
"name": "currency",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
}
]
}
},
{
"name": "rewardableLegalDelta",
"type": [
"null",
"AvroAmountV1"
],
"doc": "",
"default": null
},
{
"name": "receiptNumber",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
},
{
"name": "referenceReceiptNumber",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"doc": "",
"default": null
},
{
"name": "eventEffectiveTime",
"type": {
"type": "long"
},
"doc": ""
}
]
}
Это мой сценарий:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, ClientError, ValueSerializerError
BOOTSTRAP_SERVER = 'localhost:9092'
SCHEMA_REGISTRY = 'http://localhost:8081'
TOPIC = 'topicV1'
SCHEMA_PATH = 'schemas/schemaV1.avsc'
def schemaReader(SCHEMA_PATH):
with open(SCHEMA_PATH, 'r') as file:
data = file.read()
return data
def main():
kafka_config = {
'bootstrap.servers': BOOTSTRAP_SERVER,
'schema.registry.url': SCHEMA_REGISTRY
}
value_schema = avro.loads( schemaReader(SCHEMA_PATH) )
null = None
value = {
"orderId": "a9bcc55f-e2c0-43d6-b793-ff5f295d051d",
"offerId": "119475017578242889",
"redeemerId": "1176a01b-b2dc-45a9-91cc-232361e14f99",
"eventCancellationType": 0,
"ruleIds": ["ID-IPM00001"],
"eventOriginator": {"partnerShortName":
"partner","businessUnitShortName": null,"branchShortName": null},
"roundedDelta": {"amount": "\u8463","currency": "PTS"},
"rewardableLegalDelta": {"amount": "\u8463","currency": "EUR"},
"receiptNumber": "19b2ff68-ed06-48f0-9ce9-d697c0eadc19",
"referenceReceiptNumber": null,
"eventEffectiveTime": 1569494696656
}
avroProducer = AvroProducer(kafka_config, default_value_schema=value_schema )
avroProducer.produce(topic=TOPIC, value=value, value_schema=value_schema)
avroProducer.flush()
if __name__== "__main__":
main()
Это трассировка, которую я получаю:
File "producer.py", line 64, in <module>
main()
File "producer.py", line 60, in main
avroProducer.produce(topic=TOPIC, value=value, value_schema=value_schema)
File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/__init__.py", line 80, in produce
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 115, in encode_record_with_schema
return self.encode_record_with_schema_id(schema_id, record, is_key=is_key)
File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 149, in encode_record_with_schema_id
writer(record, outf)
File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 86, in <lambda>
return lambda record, fp: writer.write(record, avro.io.BinaryEncoder(fp))
File "/apps/python/python2.7/lib/python2.7/site-packages/avro/io.py", line 1042, in write
raise AvroTypeException(self.writers_schema, datum)
avro.io.AvroTypeException: The datum {'..'} is not an example of the schema { ..}