Единственный ответ, который я получил до сих пор, это то, что вы должны дать схеме и топикам c одно и то же имя, и тогда это должно связать их вместе. Но после регистрации схемы с именем test_topic
, например:
{
"type": "record",
"name": "test_topic",
"namespace": "com.test",
"doc": "My test schema",
"fields": [
{
"name": "name",
"type": "string"
}
]
}
и выполнения следующей команды, она вставляет без проблем.
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{"name": "My first name"}}]}' "http://localhost/topics/test_topic"
Но когда я запускаю следующую команду как ну он вставляет без каких-либо ошибок (обратите внимание, я изменил имя свойства)
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{"test": "My first name"}}]}' "http://localhost/topics/test_topic"
Я бы подозревал сообщение об ошибке, говорящее, что мои данные не соответствуют схеме для этой топи c ...
Мой идентификатор схемы равен 10, поэтому я знаю, что он работает и зарегистрирован, но на данный момент не очень полезен.
Python Код:
from confluent_kafka import Producer
import socket
import json
conf = {'bootstrap.servers': 'localhost:9092', 'client.id': socket.gethostname()}
producer = Producer(conf)
def acked(err, msg):
if err is not None:
print(f'Failed to deliver message: {str(msg)}, {str(err)}')
else:
print(f'Message produced: {str(msg)}')
producer.produce("test_topic", key="key", value=json.dumps({"test": name}).encode('ascii') , callback=acked)
producer.poll(5)