У меня проблема с отправкой объекта Avro (экземпляр org. apache .avro.specifi c .SpecificRecord) в kafka topi c с использованием соединителя источника kafka (необходимо подготовить экземпляр SourceRecord). В моем случае я предполагаю, что на основе схемы, например:
{
"namespace": "com.model.avro.generated",
"type": "record",
"name": " MessageExVal",
"version": "1",
"fields": [
{
"name": "messageSource",
"type": "string"
},
{
"name": "messageSourceVersion",
"type": [
"string",
"null"
]
}
]
}
с помощью avro-maven-plugin
для maven, я создам модель классов, используемых в моем проекте. Экземпляр класса MessageExVal
предоставляет мне «org.apache.avro.Schema
» (по методам getSchema() or getClassSchema()
). Со второй стороны kafka connect api требует от меня org.apache.kafka.connect.data.Schema
, чтобы иметь возможность создать новый экземпляр SourceRecord
, возвращаемый методом poll()
исходного соединителя. В конфигурации я предоставляю параметры:
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
В коде AvroConverter
в методе fromConnectData()
, выполняемом после метода poll, я вижу, что преобразование из org.apache.kafka.connect.data.Schema
в org.apache.avro.Schema
будет выполнено. Так есть ли возможность передать схему avro без преобразования сначала в «версию соединения», потому что позже она все равно конвертируется обратно в avro? Ниже вы можете найти реализацию метода poll с закомментированной точкой в коде, который я имею в виду:
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new LinkedList<SourceRecord>();
MessageExVal myValue = MessageExVal.newBuilder()
.setMessageType(“some value”)
.setMessageSource(“some other value”)
.build();
SourceRecord sr = new SourceRecord(null, null,
"test_topic",
myValue.getSchema(), //incorrect - different types
myValue);
records.add(sr);
return records;
}
Суммируйте все, мой вопрос заключается в том, как поместить myValue в topi c с использованием kafka connect SourceConnector ? Буду очень признателен за каждый совет:)