Я пытаюсь понять Avro Serialization на Confluent Kafka вместе с использованием реестра Schema.До конца все шло хорошо, но последние ожидания от AVRO внушали мне много недоразумений.Согласно моему пониманию и пониманию, Avro Serialization дает нам гибкость, что, когда у нас есть изменение в схеме, мы можем просто управлять этим, не влияя на старого производителя / потребителя.
Следуя тому же, я разработал Pythonпроизводитель, который проверит существование схемы в Schema-Registry, если она отсутствует, создайте ее и начните создавать сообщения json, показанные ниже.Когда мне нужно изменить схему, я просто обновляю ее в своем производителе, и это создает сообщения с новой схемой.
Моя старая схема:
data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'
Образец данных от источника-1:
{u'mobile': 9819841242, u'lname': u'Rogers', u'passport_expiry_date': u'2026-05-21', u'passport_make_date': u'2016-05-21', u'fname': u'tom', u'ipaddress': u'208.103.236.60', u'email': u'tom_Rogers@TEST.co.nz', u'principal': u'tom@EXAMPLE.COM'}
Моя новая схема:
data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"new_passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"new_passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'
Образец данных от Producer-2:
{u'mobile': 9800647004, u'new_passport_make_date': u'2011-05-22', u'lname': u'Reed', u'fname': u'Paul', u'new_passport_expiry_date': u'2021-05-22', u'ipaddress': u'134.124.7.28', u'email': u'Paul_Reed@nbc.com', u'principal': u'Paul@EXAMPLE.COM'}
Случай 1: когда у меня работают 2 производителя с двумя вышеуказанными схемами, работающими вместеЯ могу успешно использовать сообщение с кодом ниже.Пока все хорошо.
while True:
try:
msg = c.poll(10)
except SerializerError as e:
xxxxx
break
print msg.value()
Случай 2: Когда я немного углубляюсь в области JSON, все смешивается и ломается.
Сначала скажем, что у меня работает один продюсер с 'Моя старая схема »выше и один потребитель успешно использует эти сообщения.
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]
Когда я запускаю второго производителя с« Моей новой схемой », упомянутой выше, мои старые потребители ломаются, так как нет полей Passport_expiry_date и passport_make_date, которыеВерно.
Вопрос:
Иногда я думаю, что это ожидаемо, поскольку я (Разработчик) использует имена полей, которых нет в Послании.Но как Авро может помочь здесь?Разве недостающее поле не должно обрабатываться Avro?Я видел примеры в JAVA, где эта ситуация была обработана должным образом, но не нашел ни одного примера в Python.Например, ниже GitHub имеет прекрасный пример обработки этого сценария.Когда поле отсутствует, Потребитель просто печатает «Нет».
https://github.com/LearningJournal/ApacheKafkaTutorials
Случай 3: Когда я запускаю комбинации, такие как Старый производитель со старым потребителем, а затем в других терминалах Новый производительс новым потребителем, производители / потребители смешиваются и вещи ломаются, говоря, что нет поля json.
старый потребитель ==>
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]
новый потребитель ==>
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["new_passport_make_date"], msg.value()["new_passport_expiry_date"]
Вопрос:
Опять я думаю, это ожидаемо.Но тогда Avro заставляет меня думать, что правильный потребитель должен получить правильное сообщение с правильной схемой.Если я использую msg.value () и всегда анализирую поля на стороне потребителя, используя программирование без какой-либо роли Avro, то в чем преимущество использования avro?В чем преимущество отправки схемы с сообщениями / хранения в SR?
Наконец, есть ли способ проверить схему, прикрепленную к сообщению?Я понимаю, что в Avro идентификатор схемы прилагается к сообщению, которое в дальнейшем используется в реестре схем при чтении и записи сообщений.Но я никогда не вижу это с сообщениями.
Большое спасибо в Advance.