Как обработать динамическое значение сообщения в Кафке? - PullRequest
0 голосов
/ 14 октября 2019

Мне трудно реализовать функцию, в которой я могу иметь динамическое значение сообщения для Кафки. Я использую AvroProducer из confluent-kafka-python вместе с schema registry. Производитель отправит сообщение в следующем формате:

{'id':1, 'name':'A', 'properties': {'key1': 'value1', 'key2': 'value2', 'key2': 'value3'}},
{'id': 2, 'name': 'X', 'properties': {'key1': 'value1'}} 

properties может варьироваться между сообщениями. Таким образом, некоторые могут иметь больше пар ключ-значение, а некоторые могут иметь меньше. И я пытаюсь получить это сообщение от Kafka на postgresql, используя kafka connect. Я хочу, чтобы properties было json, введите postgresql базу данных.

Как этого достичь? Любые указатели будут по достоинству оценены. Благодарю.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...