может ли confluent-python создавать данные со значением в avro и ключом в строке? - PullRequest
0 голосов
/ 27 февраля 2019

Я использую python3 и confluent-python , чтобы отправить сообщение Кафке.Мне нужно отправить данные со значением в Avro и введите строку.Но я обнаружил, что Confluent-Python может отправлять только в Avro или оба в строке.Исходный код confluent-python выглядит так:

def produce(self, **kwargs):
    """
        Asynchronously sends a message to Kafka by encoding with specified or default Avro schema.

        :param str topic: topic name
        :param object value: An object to serialize
        :param str value_schema: Avro schema for value
        :param object key: An object to serialize
        :param str key_schema: Avro schema for key

        Plus any other parameters accepted by confluent_kafka.Producer.produce

        :raises SerializerError: On serialization failure
        :raises BufferError: If producer queue is full.
        :raises KafkaException: For other produce failures.
    """
    # get schemas from  kwargs if defined
    key_schema = kwargs.pop('key_schema', self._key_schema)
    value_schema = kwargs.pop('value_schema', self._value_schema)
    topic = kwargs.pop('topic', None)
    if not topic:
        raise ClientError("Topic name not specified.")
    value = kwargs.pop('value', None)
    key = kwargs.pop('key', None)

    if value is not None:
        if value_schema:
            value = self._serializer.encode_record_with_schema(topic, value_schema, value)
        else:
            raise ValueSerializerError("Avro schema required for values")

    if key is not None:
        if key_schema:
            key = self._serializer.encode_record_with_schema(topic, key_schema, key, True)
        else:
            raise KeySerializerError("Avro schema required for key")

    super(AvroProducer, self).produce(topic, value, key, **kwargs)

Кто-нибудь знает?

1 Ответ

0 голосов
/ 23 мая 2019

Так что мой обходной путь - просто изменить код Python, чтобы не вызывать исключение.Я предполагаю, что авторы библиотеки не допускают гибкости использования схемы только для ключа или только для значения по причине, но не знают, что это такое.Для моего случая использования необходимости публикации данных такого типа в процессе разработки я думал, что это исправлено.

Изменение кода в confluent_kafka/avro/__init__.py и просто удаление строк 87 и 88:

84    if key is not None:
85        if key_schema:
86            key = self._serializer.encode_record_with_schema(topic, key_schema, key, True)
87        else:
88            raise KeySerializerError("Avro schema required for key")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...