Мне трудно реализовать функцию, в которой я могу иметь динамическое значение сообщения для Кафки. Я использую AvroProducer
из confluent-kafka-python
вместе с schema registry
. Производитель отправит сообщение в следующем формате:
{'id':1, 'name':'A', 'properties': {'key1': 'value1', 'key2': 'value2', 'key2': 'value3'}},
{'id': 2, 'name': 'X', 'properties': {'key1': 'value1'}}
properties
может варьироваться между сообщениями. Таким образом, некоторые могут иметь больше пар ключ-значение, а некоторые могут иметь меньше. И я пытаюсь получить это сообщение от Kafka на postgresql
, используя kafka connect. Я хочу, чтобы properties
было json
, введите postgresql
базу данных.
Как этого достичь? Любые указатели будут по достоинству оценены. Благодарю.