Kafka Connect не работает с предметными стратегиями - PullRequest
0 голосов
/ 28 ноября 2018

Контекст

Я кодировал пару небольших Kafka Connect разъемов.Один, который генерирует случайные данные каждую секунду, а другой, который регистрирует их в консоли.Они интегрированы с Registry Schema , поэтому данные сериализуются с помощью Avro .

. Я развернул их в локальной среде Kafka с помощью fast-data-dev Docker, предоставленное Landoop

Базовая настройка работает и каждую секунду регистрируется сообщение, которое регистрируется

Однако я хочу изменить стратегию имени субъекта .По умолчанию создается два объекта:

  • ${topic}-key
  • ${topic}-value

В соответствии с моим вариантом использования, мне нужно будет генерировать событияс различными схемами, которые в конечном итоге на одну и ту же тему.Поэтому названия предметов, которые мне нужны:

  • ${topic}-${keyRecordName}
  • ${topic}-${valueRecordName}

Согласно документам ,мои потребности вписываются в TopicRecordNameStrategy

Что я пробовал

Я создаю avroData объект для отправки значений для подключения:

class SampleSourceConnectorTask : SourceTask() {

    private lateinit var avroData: AvroData 

    override fun start(props: Map<String, String>) {
        [...]
        avroData = AvroData(AvroDataConfig(props))
    }

и использовать его впоследствии для создания SourceRecord объектов ответа

В документации говорится, что для использования реестра схем в Kafka Connect необходимо установить некоторые свойствав конфиге коннектора.Поэтому, когда я его создаю, я добавляю их:

name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

Проблема

Соединитель игнорирует эти свойства и продолжает использовать старые ${topic}-key и ${topic}-value предметы.

Вопрос

Kafka Connect должен поддерживать различные предметные стратегии.Мне удалось обойти эту проблему, написав собственную версию AvroConverter и жестко запрограммировав, что мне нужна предметная стратегия.Однако это не выглядит хорошим подходом и также вызывает проблемы при попытке использовать данные с помощью Sink Kafka Connector.Я продублировал тему, так что есть версия со старым именем (${topic}-key), и она работает

Как правильно настроить стратегию объекта для Kafka Connect?

1 Ответ

0 голосов
/ 28 ноября 2018

Вам не хватает префикса key.converter и value.converter для передачи конфигурации на конвектор.Поэтому вместо:

key.subject.name.strategy
value.subject.name.strategy

вы хотите:

key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy

Источник https://docs.confluent.io/current/connect/managing/configuring.html:

Для передачи параметров конфигурации в преобразователи ключа и значениядобавьте к ним префикс key.converter. или value.converter., как в рабочей конфигурации при определении конвертеров по умолчанию.Обратите внимание, что они используются только в том случае, если соответствующая конфигурация преобразователя указана в свойствах key.converter или value.converter.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...