API Kafka Connect и Avro Object (SourceRecord vs org. apache .avro.Schema) - PullRequest
0 голосов
/ 04 февраля 2020

У меня проблема с отправкой объекта Avro (экземпляр org. apache .avro.specifi c .SpecificRecord) в kafka topi c с использованием соединителя источника kafka (необходимо подготовить экземпляр SourceRecord). В моем случае я предполагаю, что на основе схемы, например:

{
    "namespace": "com.model.avro.generated",
    "type": "record",
    "name": " MessageExVal",
    "version": "1",
    "fields": [
        {
            "name": "messageSource",
            "type": "string"
        },
        {
            "name": "messageSourceVersion",
            "type": [
                "string",
                "null"
            ]
        }
    ]
}

с помощью avro-maven-plugin для maven, я создам модель классов, используемых в моем проекте. Экземпляр класса MessageExVal предоставляет мне «org.apache.avro.Schema» (по методам getSchema() or getClassSchema()). Со второй стороны kafka connect api требует от меня org.apache.kafka.connect.data.Schema, чтобы иметь возможность создать новый экземпляр SourceRecord, возвращаемый методом poll() исходного соединителя. В конфигурации я предоставляю параметры:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",

В коде AvroConverter в методе fromConnectData(), выполняемом после метода poll, я вижу, что преобразование из org.apache.kafka.connect.data.Schema в org.apache.avro.Schema будет выполнено. Так есть ли возможность передать схему avro без преобразования сначала в «версию соединения», потому что позже она все равно конвертируется обратно в avro? Ниже вы можете найти реализацию метода poll с закомментированной точкой в ​​коде, который я имею в виду:

@Override
    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> records = new LinkedList<SourceRecord>();
        MessageExVal myValue = MessageExVal.newBuilder()
                .setMessageType(“some value”)
                .setMessageSource(“some other value”)
                .build();
        SourceRecord sr = new SourceRecord(null, null,
                "test_topic",
                myValue.getSchema(), //incorrect - different types
                myValue);
        records.add(sr);
        return records;
    }

Суммируйте все, мой вопрос заключается в том, как поместить myValue в topi c с использованием kafka connect SourceConnector ? Буду очень признателен за каждый совет:)

1 Ответ

0 голосов
/ 06 февраля 2020

потому что потом все равно конвертируется обратно в avro?

Данные хранятся в двоичном виде в топи c, поэтому вам все равно придется оплачивать стоимость десериализации

API-интерфейс kafka connect требует от меня организации. apache .kafka.connect.data.Schema, чтобы иметь возможность создавать новый экземпляр SourceRecord

Да. Вы можете использовать toConnectData, чтобы получить это, или вы можете просто удалить Avro из зависимостей вашего кода и напрямую создавать экземпляры схемы и структуры из Connect.

Преобразователи отвечают за сериализацию, и Avro не требуется в подключении

...