Я использую python3 и confluent-python , чтобы отправить сообщение Кафке.Мне нужно отправить данные со значением в Avro и введите строку.Но я обнаружил, что Confluent-Python может отправлять только в Avro или оба в строке.Исходный код confluent-python выглядит так:
def produce(self, **kwargs):
"""
Asynchronously sends a message to Kafka by encoding with specified or default Avro schema.
:param str topic: topic name
:param object value: An object to serialize
:param str value_schema: Avro schema for value
:param object key: An object to serialize
:param str key_schema: Avro schema for key
Plus any other parameters accepted by confluent_kafka.Producer.produce
:raises SerializerError: On serialization failure
:raises BufferError: If producer queue is full.
:raises KafkaException: For other produce failures.
"""
# get schemas from kwargs if defined
key_schema = kwargs.pop('key_schema', self._key_schema)
value_schema = kwargs.pop('value_schema', self._value_schema)
topic = kwargs.pop('topic', None)
if not topic:
raise ClientError("Topic name not specified.")
value = kwargs.pop('value', None)
key = kwargs.pop('key', None)
if value is not None:
if value_schema:
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
else:
raise ValueSerializerError("Avro schema required for values")
if key is not None:
if key_schema:
key = self._serializer.encode_record_with_schema(topic, key_schema, key, True)
else:
raise KeySerializerError("Avro schema required for key")
super(AvroProducer, self).produce(topic, value, key, **kwargs)
Кто-нибудь знает?