Confluent-Kafka: противоречия в сериализации Avro с обработкой схем в потребителях Python - PullRequest
0 голосов
/ 24 октября 2018

Я пытаюсь понять Avro Serialization на Confluent Kafka вместе с использованием реестра Schema.До конца все шло хорошо, но последние ожидания от AVRO внушали мне много недоразумений.Согласно моему пониманию и пониманию, Avro Serialization дает нам гибкость, что, когда у нас есть изменение в схеме, мы можем просто управлять этим, не влияя на старого производителя / потребителя.

Следуя тому же, я разработал Pythonпроизводитель, который проверит существование схемы в Schema-Registry, если она отсутствует, создайте ее и начните создавать сообщения json, показанные ниже.Когда мне нужно изменить схему, я просто обновляю ее в своем производителе, и это создает сообщения с новой схемой.

Моя старая схема:

data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'

Образец данных от источника-1:

{u'mobile': 9819841242, u'lname': u'Rogers', u'passport_expiry_date': u'2026-05-21', u'passport_make_date': u'2016-05-21', u'fname': u'tom', u'ipaddress': u'208.103.236.60', u'email': u'tom_Rogers@TEST.co.nz', u'principal': u'tom@EXAMPLE.COM'}

Моя новая схема:

data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"new_passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"new_passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'

Образец данных от Producer-2:

{u'mobile': 9800647004, u'new_passport_make_date': u'2011-05-22', u'lname': u'Reed', u'fname': u'Paul', u'new_passport_expiry_date': u'2021-05-22', u'ipaddress': u'134.124.7.28', u'email': u'Paul_Reed@nbc.com', u'principal': u'Paul@EXAMPLE.COM'}

Случай 1: когда у меня работают 2 производителя с двумя вышеуказанными схемами, работающими вместеЯ могу успешно использовать сообщение с кодом ниже.Пока все хорошо.

while True:
    try:
        msg = c.poll(10)

    except SerializerError as e:
        xxxxx 
        break
    print msg.value()

Случай 2: Когда я немного углубляюсь в области JSON, все смешивается и ломается.

Сначала скажем, что у меня работает один продюсер с 'Моя старая схема »выше и один потребитель успешно использует эти сообщения.

print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]

Когда я запускаю второго производителя с« Моей новой схемой », упомянутой выше, мои старые потребители ломаются, так как нет полей Passport_expiry_date и passport_make_date, которыеВерно.

Вопрос:

Иногда я думаю, что это ожидаемо, поскольку я (Разработчик) использует имена полей, которых нет в Послании.Но как Авро может помочь здесь?Разве недостающее поле не должно обрабатываться Avro?Я видел примеры в JAVA, где эта ситуация была обработана должным образом, но не нашел ни одного примера в Python.Например, ниже GitHub имеет прекрасный пример обработки этого сценария.Когда поле отсутствует, Потребитель просто печатает «Нет».

https://github.com/LearningJournal/ApacheKafkaTutorials

Случай 3: Когда я запускаю комбинации, такие как Старый производитель со старым потребителем, а затем в других терминалах Новый производительс новым потребителем, производители / потребители смешиваются и вещи ломаются, говоря, что нет поля json.

старый потребитель ==>

print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]

новый потребитель ==>

print msg.value()["fname"] , msg.value()["lname"] , msg.value()["new_passport_make_date"], msg.value()["new_passport_expiry_date"]

Вопрос:

Опять я думаю, это ожидаемо.Но тогда Avro заставляет меня думать, что правильный потребитель должен получить правильное сообщение с правильной схемой.Если я использую msg.value () и всегда анализирую поля на стороне потребителя, используя программирование без какой-либо роли Avro, то в чем преимущество использования avro?В чем преимущество отправки схемы с сообщениями / хранения в SR?

Наконец, есть ли способ проверить схему, прикрепленную к сообщению?Я понимаю, что в Avro идентификатор схемы прилагается к сообщению, которое в дальнейшем используется в реестре схем при чтении и записи сообщений.Но я никогда не вижу это с сообщениями.

Большое спасибо в Advance.

1 Ответ

0 голосов
/ 26 октября 2018

Непонятно, какую настройку совместимости вы используете в реестре, но я буду предполагать в обратном направлении, что означает, что вам нужно было бы добавить поле со значением по умолчанию.

Звучит так, как будто вы получаете Python KeyError, потому что эти ключи не существуют.

Вместо msg.value()["non-existing-key"] вы можете попробовать

вариант 1: рассматривать его как dict()

msg.value().get("non-existing-key", "Default value")

вариант 2: проверить отдельно все ключи, которыеможет отсутствовать

some_var = None  # What you want to parse
val = msg.value()
if "non-existing-key" not in val:
    some_var = "Default Value"

В противном случае вы должны «проецировать» более новую схему на более старые данные, что и делает код Java, используя подкласс SpecificRecord.Таким образом, более старые данные будут проанализированы с более новой схемой, которая имеет более новые поля с их значениями по умолчанию.

Если вместо этого вы используете GenericRecord в Java, у вас будут аналогичные проблемы.Я не уверен, что в Python есть эквивалент SpecificRecord в Java.

Кстати, я не думаю, что строка "None" может быть применена для logicalType=timestamp

...