Контекст
Я кодировал пару небольших Kafka Connect разъемов.Один, который генерирует случайные данные каждую секунду, а другой, который регистрирует их в консоли.Они интегрированы с Registry Schema , поэтому данные сериализуются с помощью Avro .
. Я развернул их в локальной среде Kafka с помощью fast-data-dev Docker, предоставленное Landoop
Базовая настройка работает и каждую секунду регистрируется сообщение, которое регистрируется
Однако я хочу изменить стратегию имени субъекта .По умолчанию создается два объекта:
${topic}-key
${topic}-value
В соответствии с моим вариантом использования, мне нужно будет генерировать событияс различными схемами, которые в конечном итоге на одну и ту же тему.Поэтому названия предметов, которые мне нужны:
${topic}-${keyRecordName}
${topic}-${valueRecordName}
Согласно документам ,мои потребности вписываются в TopicRecordNameStrategy
Что я пробовал
Я создаю avroData
объект для отправки значений для подключения:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
и использовать его впоследствии для создания SourceRecord
объектов ответа
В документации говорится, что для использования реестра схем в Kafka Connect необходимо установить некоторые свойствав конфиге коннектора.Поэтому, когда я его создаю, я добавляю их:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Проблема
Соединитель игнорирует эти свойства и продолжает использовать старые ${topic}-key
и ${topic}-value
предметы.
Вопрос
Kafka Connect должен поддерживать различные предметные стратегии.Мне удалось обойти эту проблему, написав собственную версию AvroConverter
и жестко запрограммировав, что мне нужна предметная стратегия.Однако это не выглядит хорошим подходом и также вызывает проблемы при попытке использовать данные с помощью Sink Kafka Connector.Я продублировал тему, так что есть версия со старым именем (${topic}-key
), и она работает
Как правильно настроить стратегию объекта для Kafka Connect?