Регистрация идентификатора схемы с помощью Topi c с использованием confluent_kafka для python - PullRequest
1 голос
/ 05 января 2020

Единственный ответ, который я получил до сих пор, это то, что вы должны дать схеме и топикам c одно и то же имя, и тогда это должно связать их вместе. Но после регистрации схемы с именем test_topic, например:

{
  "type": "record",
  "name": "test_topic",
  "namespace": "com.test",
  "doc": "My test schema",
  "fields": [
    {
      "name": "name",
      "type": "string"
    }
  ]
}

и выполнения следующей команды, она вставляет без проблем.

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{"name": "My first name"}}]}' "http://localhost/topics/test_topic"

Но когда я запускаю следующую команду как ну он вставляет без каких-либо ошибок (обратите внимание, я изменил имя свойства)

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{"test": "My first name"}}]}' "http://localhost/topics/test_topic"

Я бы подозревал сообщение об ошибке, говорящее, что мои данные не соответствуют схеме для этой топи c ...

Мой идентификатор схемы равен 10, поэтому я знаю, что он работает и зарегистрирован, но на данный момент не очень полезен.

Python Код:

from confluent_kafka import Producer
import socket
import json
conf = {'bootstrap.servers': 'localhost:9092', 'client.id': socket.gethostname()}
producer = Producer(conf)

def acked(err, msg):
    if err is not None:
        print(f'Failed to deliver message: {str(msg)}, {str(err)}')
    else:
        print(f'Message produced: {str(msg)}')

    producer.produce("test_topic", key="key", value=json.dumps({"test": name}).encode('ascii') , callback=acked)

producer.poll(5)

1 Ответ

2 голосов
/ 05 января 2020

вы должны присвоить схеме и топи c одно и то же имя, и тогда это должно связать их вместе

Это не совсем так, как работает реестр схем.

Каждая запись кафки имеет ключ и значение.

В Реестре есть темы, которые не привязаны строго к темам.

Однако реализация сериализатора клиентов (de) Kafka будет использовать имена субъектов topic-key и topic-value для регистрации / извлечения схем из реестра.

Клиенты не могут указать реестру, какой идентификатор поместить схему в. Этот лог c рассчитан на стороне сервера


Я не уверен, что понимаю, что ваше сообщение связано с REST Proxy, но вы публикуете простой JSON и не говорите, что данные должны быть Avro (вы используете неправильный заголовок)

Если используется Avro, тип контента будет application/vnd.kafka.avro.v2+json

...