Фаставро - конвертировать из байта в двойной - PullRequest
0 голосов
/ 02 октября 2018

Я написал скрипт для преобразования диапазона файлов avro с различными схемами из байтов логического типа, десятичного логического типа в двойной.Во всех схемах я изменяю:

'type': {'connect.name': 'org.apache.kafka.connect.data.Decimal',
         'precision': 20,
         'connect.parameters': {'scale': '8',
          'connect.decimal.precision': '20'},
         'scale': 8,
         'logicalType': 'decimal',
         'type': 'bytes',
         'connect.version': 1}}]}

на просто:

'type':'double'

И в остальном сохраняю схему неизменной.

Затем я использую следующий код:

 with open(input_file, 'rb') as f_in:
       records = list(fastavro.reader(f_in))


#Reserialize and save locally
with open(output_file, 'wb') as out:
    fastavro.writer(out, fixed_schema, records)

Для десериализации и повторной сериализации файлов Avro (фиксированная схема идентична схеме, за исключением ранее упомянутой корректировки с байтов на удвоение.

Для 3 схем это работает отлично - fastavroправильно читает записи типа байтов как десятичные числа, которые он может записать в тип double. В четвертом случае я получаю следующую ошибку. Я публикую ниже схемы (до и после).

ValueError: {'NSG': {'apr': Decimal('0.08325675'), 'interestRate': 
Decimal('0.05080000'), 'fee': Decimal('950.00000000'), 'monthlyPayment': 
Decimal('378.57427442')}, 'SG': {'apr': Decimal('0.08325675'), ' 
'interestRate': Decimal('0.05080000'), 'fee': Decimal('950.00000000'), 
'monthlyPayment': Decimal('378.57427442')}} (type <class 'dict'>) do not 
 match ['null', {'values': {'type': 'record', 'connect.name': 
 'com.zopa.events.origination.quotation.Price', '__fastavro_parsed': True, 
 'name': 'com.zopa.events.origination.quotation.Price', 'fields': [{'type': 
 'double', 'name': 'apr'}, {'type': 'double', 'name': 'fee'}, {'type': 
 'double', 'name': 'interestRate'}, {'type': 'double', 'name': 
 'monthlyPayment'}]}, 'type': 'map', 'connect.parameters': 
 {'avro.java.string': 'String'}, 'avro.java.string': 'String'}]

Что здесь не так? В этом примере, если я обновляю схему для quotes.amount, fastavro может писать без проблем, но если я также обновляю для quotes.prices.sg.apr, я получаю сообщение об ошибке ниже, дажехотя раньше поля были одного и того же типа. Вы сможете повторить это с одной десериализованной строкой, опубликованной ниже. какие изменения мне нужно внести в схему иecords для создания нового файла avro ?

Схема до:

  {'connect.name': 'com.xxxxx.events.origination.quotation.QuotesCreated',
 'fields': [{'doc': 'This file represents a standardised Xxxxx header that\r\n    would be expected on each message we emit to Kafka.\r\n\r\n    Check out this link for the human definition and reasoning\r\n    https://github.com/xxxxxUK/rfc/blob/master/correlating_requests.md',
   'name': 'header',
   'type': {'connect.name': 'com.xxxxx.events.Header',
    'fields': [{'name': 'recordId',
      'type': {'avro.java.string': 'String',
       'type': 'string',
       'connect.parameters': {'avro.java.string': 'String'}}},
     {'default': [],
      'name': 'causationIds',
      'type': {'type': 'array',
       'items': {'avro.java.string': 'String',
        'type': 'string',
        'connect.parameters': {'avro.java.string': 'String'}},
       'connect.default': []}},
     {'name': 'correlationId',
      'type': {'avro.java.string': 'String',
       'type': 'string',
       'connect.parameters': {'avro.java.string': 'String'}}},
     {'name': 'eventTime',
      'type': {'connect.name': 'org.apache.kafka.connect.data.Timestamp',
       'logicalType': 'timestamp-millis',
       'type': 'long',
       'connect.version': 1}},
     {'default': {},
      'name': 'otherMetadata',
      'type': {'values': {'avro.java.string': 'String',
        'type': 'string',
        'connect.parameters': {'avro.java.string': 'String'}},
       'avro.java.string': 'String',
       'type': 'map',
       'connect.default': {},
       'connect.parameters': {'avro.java.string': 'String'}}}],
    'connect.parameters': {'connect.record.doc': 'This file represents a standardised Xxxxx header that\r\n    would be expected on each message we emit to Kafka.\r\n\r\n    Check out this link for the human definition and reasoning\r\n    https://github.com/xxxxxUK/rfc/blob/master/correlating_requests.md'},
    'namespace': 'com.xxxxx.events',
    'doc': 'This file represents a standardised Xxxxx header that\r\n    would be expected on each message we emit to Kafka.\r\n\r\n    Check out this link for the human definition and reasoning\r\n    https://github.com/xxxxxUK/rfc/blob/master/correlating_requests.md',
    'name': 'Header',
    'type': 'record',
    'connect.doc': 'This file represents a standardised Xxxxx header that\r\n    would be expected on each message we emit to Kafka.\r\n\r\n    Check out this link for the human definition and reasoning\r\n    https://github.com/xxxxxUK/rfc/blob/master/correlating_requests.md'}},
  {'name': 'channelCode',
   'type': {'avro.java.string': 'String',
    'type': 'string',
    'connect.parameters': {'avro.java.string': 'String'}}},
  {'doc': 'ID of a borrower application, will be reused across various business flows',
   'name': 'applicationId',
   'type': {'connect.doc': 'ID of a borrower application, will be reused across various business flows',
    'avro.java.string': 'String',
    'type': 'string',
    'connect.parameters': {'avro.java.string': 'String'}}},
  {'doc': 'ID for a borrower quote request, will be new between retries in a single borrower application',
   'name': 'requestId',
   'type': {'connect.doc': 'ID for a borrower quote request, will be new between retries in a single borrower application',
    'avro.java.string': 'String',
    'type': 'string',
    'connect.parameters': {'avro.java.string': 'String'}}},
  {'doc': 'List of quotes priced for this borrower application',
   'name': 'quotes',
   'type': {'connect.doc': 'List of quotes priced for this borrower application',
    'type': 'array',
    'items': {'connect.name': 'com.xxxxx.events.origination.quotation.PricedQuote',
     'name': 'PricedQuote',
     'type': 'record',
     'fields': [{'name': 'id',
       'type': {'avro.java.string': 'String',
        'type': 'string',
        'connect.parameters': {'avro.java.string': 'String'}}},
      {'name': 'amount',
       'type': {'connect.name': 'org.apache.kafka.connect.data.Decimal',
        'precision': 20,
        'connect.parameters': {'scale': '8',
         'connect.decimal.precision': '20'},
        'scale': 8,
        'logicalType': 'decimal',
        'type': 'bytes',
        'connect.version': 1}},
      {'name': 'term', 'type': 'int'},
      {'default': False,
       'name': 'isRequestedQuote',
       'type': {'type': 'boolean', 'connect.default': False}},
      {'default': None,
       'name': 'riskScore',
       'type': ['null',
        {'connect.name': 'com.xxxxx.events.origination.quotation.RiskScore',
         'name': 'RiskScore',
         'type': 'record',
         'fields': [{'name': 'score', 'type': 'float'},
          {'default': 'UNKNOWN',
           'name': 'riskMarket',
           'type': {'connect.name': 'com.xxxxx.events.origination.decisioning.RiskMarket',
            'type': 'string',
            'connect.default': 'UNKNOWN',
            'connect.parameters': {'io.confluent.connect.avro.Enum.ASTAR': 'ASTAR',
             'io.confluent.connect.avro.Enum.A1': 'A1',
             'io.confluent.connect.avro.Enum': 'com.xxxxx.events.origination.decisioning.RiskMarket',
             'io.confluent.connect.avro.Enum.D': 'D',
             'io.confluent.connect.avro.Enum.A2': 'A2',
             'io.confluent.connect.avro.Enum.UNKNOWN': 'UNKNOWN',
             'connect.enum.doc': None,
             'io.confluent.connect.avro.Enum.N': 'N',
             'io.confluent.connect.avro.Enum.B': 'B',
             'io.confluent.connect.avro.Enum.E': 'E',
             'io.confluent.connect.avro.Enum.C1': 'C1'}}}]}]},
      {'default': None,
       'name': 'affordability',
       'type': ['null',
        {'connect.name': 'com.xxxxx.events.origination.quotation.Affordability',
         'name': 'Affordability',
         'type': 'record',
         'fields': [{'name': 'DI', 'type': 'float'},
          {'name': 'DTI', 'type': 'float'},
          {'name': 'HDTI', 'type': 'float'},
          {'name': 'LDTI', 'type': 'float'}]}]},
      {'default': None,
       'name': 'prices',
       'type': ['null',
        {'values': {'connect.name': 'com.xxxxx.events.origination.quotation.Price',
          'name': 'Price',
          'type': 'record',
          'fields': [{'name': 'apr',
            'type': {'connect.name': 'org.apache.kafka.connect.data.Decimal',
             'precision': 20,
             'connect.parameters': {'scale': '8',
              'connect.decimal.precision': '20'},
             'scale': 8,
             'logicalType': 'decimal',
             'type': 'bytes',
             'connect.version': 1}},
           {'name': 'fee',
            'type': {'connect.name': 'org.apache.kafka.connect.data.Decimal',
             'precision': 20,
             'connect.parameters': {'scale': '8',
              'connect.decimal.precision': '20'},
             'scale': 8,
             'logicalType': 'decimal',
             'type': 'bytes',
             'connect.version': 1}},
           {'name': 'interestRate',
            'type': {'connect.name': 'org.apache.kafka.connect.data.Decimal',
             'precision': 20,
             'connect.parameters': {'scale': '8',
              'connect.decimal.precision': '20'},
             'scale': 8,
             'logicalType': 'decimal',
             'type': 'bytes',
             'connect.version': 1}},
           {'name': 'monthlyPayment',
            'type': {'connect.name': 'org.apache.kafka.connect.data.Decimal',
             'precision': 20,
             'connect.parameters': {'scale': '8',
              'connect.decimal.precision': '20'},
             'scale': 8,
             'logicalType': 'decimal',
             'type': 'bytes',
             'connect.version': 1}}]},
         'avro.java.string': 'String',
         'type': 'map',
         'connect.parameters': {'avro.java.string': 'String'}}]}]}}}],
 'namespace': 'com.xxxxx.events.origination.quotation',
 'name': 'QuotesCreated',
 'type': 'record',
 'connect.version': 3}

Схема после:

{'connect.name': 'com.xxxxx.events.origination.quotation.QuotesCreated',
 'fields': [{'doc': 'This file represents a standardised Xxxxx header that\r\n    would be expected on each message we emit to Kafka.\r\n\r\n    Check out this link for the human definition and reasoning\r\n    https://github.com/xxxxxUK/rfc/blob/master/correlating_requests.md',
   'name': 'header',
   'type': {'connect.name': 'com.xxxxx.events.Header',
    'fields': [{'name': 'recordId',
      'type': {'avro.java.string': 'String',
       'type': 'string',
       'connect.parameters': {'avro.java.string': 'String'}}},
     {'default': [],
      'name': 'causationIds',
      'type': {'type': 'array',
       'items': {'avro.java.string': 'String',
        'type': 'string',
        'connect.parameters': {'avro.java.string': 'String'}},
       'connect.default': []}},
     {'name': 'correlationId',
      'type': {'avro.java.string': 'String',
       'type': 'string',
       'connect.parameters': {'avro.java.string': 'String'}}},
     {'name': 'eventTime',
      'type': {'connect.name': 'org.apache.kafka.connect.data.Timestamp',
       'logicalType': 'timestamp-millis',
       'type': 'long',
       'connect.version': 1}},
     {'default': {},
      'name': 'otherMetadata',
      'type': {'values': {'avro.java.string': 'String',
        'type': 'string',
        'connect.parameters': {'avro.java.string': 'String'}},
       'avro.java.string': 'String',
       'type': 'map',
       'connect.default': {},
       'connect.parameters': {'avro.java.string': 'String'}}}],
    'connect.parameters': {'connect.record.doc': 'This file represents a standardised Xxxxx header that\r\n    would be expected on each message we emit to Kafka.\r\n\r\n    Check out this link for the human definition and reasoning\r\n    https://github.com/xxxxxUK/rfc/blob/master/correlating_requests.md'},
    'namespace': 'com.xxxxx.events',
    'doc': 'This file represents a standardised Xxxxx header that\r\n    would be expected on each message we emit to Kafka.\r\n\r\n    Check out this link for the human definition and reasoning\r\n    https://github.com/xxxxxUK/rfc/blob/master/correlating_requests.md',
    'name': 'Header',
    'type': 'record',
    'connect.doc': 'This file represents a standardised Xxxxx header that\r\n    would be expected on each message we emit to Kafka.\r\n\r\n    Check out this link for the human definition and reasoning\r\n    https://github.com/xxxxxUK/rfc/blob/master/correlating_requests.md'}},
  {'name': 'channelCode',
   'type': {'avro.java.string': 'String',
    'type': 'string',
    'connect.parameters': {'avro.java.string': 'String'}}},
  {'doc': 'ID of a borrower application, will be reused across various business flows',
   'name': 'applicationId',
   'type': {'connect.doc': 'ID of a borrower application, will be reused across various business flows',
    'avro.java.string': 'String',
    'type': 'string',
    'connect.parameters': {'avro.java.string': 'String'}}},
  {'doc': 'ID for a borrower quote request, will be new between retries in a single borrower application',
   'name': 'requestId',
   'type': {'connect.doc': 'ID for a borrower quote request, will be new between retries in a single borrower application',
    'avro.java.string': 'String',
    'type': 'string',
    'connect.parameters': {'avro.java.string': 'String'}}},
  {'doc': 'List of quotes priced for this borrower application',
   'name': 'quotes',
   'type': {'connect.doc': 'List of quotes priced for this borrower application',
    'type': 'array',
    'items': {'connect.name': 'com.xxxxx.events.origination.quotation.PricedQuote',
     'name': 'PricedQuote',
     'type': 'record',
     'fields': [{'name': 'id',
       'type': {'avro.java.string': 'String',
        'type': 'string',
        'connect.parameters': {'avro.java.string': 'String'}}},
      {'name': 'amount', 'type': 'double'},
      {'name': 'term', 'type': 'int'},
      {'default': False,
       'name': 'isRequestedQuote',
       'type': {'type': 'boolean', 'connect.default': False}},
      {'default': None,
       'name': 'riskScore',
       'type': ['null',
        {'connect.name': 'com.xxxxx.events.origination.quotation.RiskScore',
         'name': 'RiskScore',
         'type': 'record',
         'fields': [{'name': 'score', 'type': 'float'},
          {'default': 'UNKNOWN',
           'name': 'riskMarket',
           'type': {'connect.name': 'com.xxxxx.events.origination.decisioning.RiskMarket',
            'type': 'string',
            'connect.default': 'UNKNOWN',
            'connect.parameters': {'io.confluent.connect.avro.Enum.ASTAR': 'ASTAR',
             'io.confluent.connect.avro.Enum.A1': 'A1',
             'io.confluent.connect.avro.Enum': 'com.xxxxx.events.origination.decisioning.RiskMarket',
             'io.confluent.connect.avro.Enum.D': 'D',
             'io.confluent.connect.avro.Enum.A2': 'A2',
             'io.confluent.connect.avro.Enum.UNKNOWN': 'UNKNOWN',
             'connect.enum.doc': None,
             'io.confluent.connect.avro.Enum.N': 'N',
             'io.confluent.connect.avro.Enum.B': 'B',
             'io.confluent.connect.avro.Enum.E': 'E',
             'io.confluent.connect.avro.Enum.C1': 'C1'}}}]}]},
      {'default': None,
       'name': 'affordability',
       'type': ['null',
        {'connect.name': 'com.xxxxx.events.origination.quotation.Affordability',
         'name': 'Affordability',
         'type': 'record',
         'fields': [{'name': 'DI', 'type': 'float'},
          {'name': 'DTI', 'type': 'float'},
          {'name': 'HDTI', 'type': 'float'},
          {'name': 'LDTI', 'type': 'float'}]}]},
      {'default': None,
       'name': 'prices',
       'type': ['null',
        {'values': {'connect.name': 'com.xxxxx.events.origination.quotation.Price',
          'name': 'Price',
          'type': 'record',
          'fields': [{'name': 'apr', 'type': 'double'},
           {'name': 'fee', 'type': 'double'},
           {'name': 'interestRate', 'type': 'double'},
           {'name': 'monthlyPayment', 'type': 'double'}]},
         'avro.java.string': 'String',
         'type': 'map',
         'connect.parameters': {'avro.java.string': 'String'}}]}]}}}],
 'namespace': 'com.xxxxx.events.origination.quotation',
 'name': 'QuotesCreated',
 'type': 'record',
 'connect.version': 3}

Одна десериализованная запись:

[{'applicationId': 'aa1b6599-5b6f-e811-80c8-005056992ece',
 'channelCode': 'referrer-ppc',
 'header': {'causationIds': ['3904682c-144c-423e-bd69-1a717794ca72',
   '1adb1985-244a-415a-b105-97e0d13f2dee',
   'b5461967-fc6f-407f-ba82-f6a2a025a012',
   'cf490394-aaf6-4abd-a265-e116339b36cb',
   '7ad758c6-2faf-476b-9902-088cb549819c'],
  'correlationId': '34d2c917-e368-441e-bac1-fb0512971181',
  'eventTime': datetime.datetime(2018, 6, 13, 22, 46, 31, 204000, tzinfo=<fastavro._timezone.UTCTzinfo object at 0x7f6f53860f60>),
  'otherMetadata': {},
  'recordId': '0d2d603c-0c46-4cf4-92d2-3114a6bbc3eb'},
 'quotes': [{'affordability': {'DI': 1772.1600341796875,
    'DTI': 48.439998626708984,
    'HDTI': 60.61000061035156,
    'LDTI': 15.40999984741211},
   'amount': Decimal('15500.00000000'),
   'id': 'cd147e61-af7b-40d9-85b2-d3d0e70c0ed7',
   'isRequestedQuote': False,
   'prices': {'NSG': {'apr': Decimal('0.08325675'),
     'fee': Decimal('950.00000000'),
     'interestRate': Decimal('0.05080000'),
     'monthlyPayment': Decimal('378.57427442')},
    'SG': {'apr': Decimal('0.08325675'),
     'fee': Decimal('950.00000000'),
     'interestRate': Decimal('0.05080000'),
     'monthlyPayment': Decimal('378.57427442')}},
   'riskScore': {'riskMarket': 'A1', 'score': 0.9677764773368835},
   'term': 48},
  {'affordability': {'DI': 2036.739990234375,
    'DTI': 10.9399995803833,
    'HDTI': 47.9900016784668,
    'LDTI': 2.7799999713897705},
   'amount': Decimal('3500.00000000'),
   'id': 'cbe92ade-529b-480c-9a2b-7a41c86cfb24',
   'isRequestedQuote': False,
   'prices': {'NSG': {'apr': Decimal('0.09938296'),
     'fee': Decimal('90.00000000'),
     'interestRate': Decimal('0.08760000'),
     'monthlyPayment': Decimal('73.52774708')},
    'SG': {'apr': Decimal('0.09938296'),
     'fee': Decimal('90.00000000'),
     'interestRate': Decimal('0.08760000'),
     'monthlyPayment': Decimal('73.52774708')}},
   'riskScore': {'riskMarket': 'A2', 'score': 0.9523985981941223},
   'term': 60}],
 'requestId': 'ac1b6599-5b6f-e811-80c8-005056992ece'}
]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...