Как я могу производить без схемы как тип avro с python? - PullRequest
0 голосов
/ 16 апреля 2020

Я использую указанный ниже код и отправляю сообщение кафке. Это работает.

Но я хочу отправить сообщение без схемы, потому что у меня есть схема на kafka topi c. Я регистрирую это первым. Я не хочу отправлять схемы каждый раз.

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer


value_schema_str = """
{
   "type":"record",
   "name":"myrecord",
   "fields":[
      {
         "name":"id",
         "type":[
            "null",
            "int"
         ],
         "default":null
      },
      {
         "name":"product",
         "type":[
            "null",
            "string"
         ],
         "default":null
      },
      {
         "name":"quantity",
         "type":[
            "null",
            "int"
         ],
         "default":null
      },
      {
         "name":"price",
         "type":[
            "null",
            "int"
         ],
         "default":null
      }
   ]
}
"""

key_schema_str = """
{
   "type":"record",
   "name":"key_schema",
   "fields":[
      {
         "name":"id",
         "type":"int"
      }
   ]
}
"""


def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


if __name__ == '__main__':
    value_schema = avro.loads(value_schema_str)
    key_schema = avro.loads(key_schema_str)
    #value = {"id": 1, "product": "myProduct", "quantity": 10, "price": 100}
    key = {"id": 1}


    avroProducer = AvroProducer({
        'bootstrap.servers': '10.0.0.0:9092',
        'on_delivery': delivery_report,
        'schema.registry.url': 'http://10.0.0.0:8081'
    }, default_key_schema=key_schema, default_value_schema=value_schema)

    avroProducer.produce(topic='orders', key=key)
    avroProducer.flush()t

заранее спасибо

Ответы [ 2 ]

2 голосов
/ 17 апреля 2020

Я не хочу отправлять схему каждый раз.

Avro требуется схема. Полная остановка

У меня есть схема на kafka topi c

В темах Kafka нет схем. Я полагаю, вы имеете в виду схему в реестре? Затем вы должны получить это, прежде чем использовать его в вашем производителе

from confluent_kafka.avro import CachedSchemaRegistryClient
sr_client = CachedSchemaRegistryClient({'url': "http://10.0.0.0:8081"})

Затем используйте клиент для выполнения get_schema вызова

0 голосов
/ 17 апреля 2020

Использование confluent-kafka-python

from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient

sr = CachedSchemaRegistryClient({"url": "http://localhost:8081"})

value_schema = sr.get_latest_schema("orders-value")[1]
key_schema= sr.get_latest_schema("orders-key")[1]

Использование SchemaRegistryClient

# pip install python-schema-registry-client
from schema_registry.client import SchemaRegistryClient


sr = SchemaRegistryClient('localhost:8081')
value_schema = sr.get_schema('orders-value', version='latest').schema
key_schema = sr.get_schema('orders-key', version='latest').schema

И наконец:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer


def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

if __name__ == '__main__':
    value = {"id": 1, "product": "myProduct", "quantity": 10, "price": 100}
    key = {"id": 1}


    avroProducer = AvroProducer({
        'bootstrap.servers': '10.0.0.0:9092',
        'on_delivery': delivery_report,
        'schema.registry.url': 'http://10.0.0.0:8081'
    }, default_key_schema=key_schema, default_value_schema=value_schema)

    avroProducer.produce(topic='orders', key=key, value=value)
    avroProducer.flush()
...