Кафка СинкРекорд на Авро Объект - PullRequest
0 голосов
/ 26 декабря 2018

Я пытаюсь собрать Kafka Sink Connector, производитель отправляет сообщения в Avro.Как преобразовать SinkRecord в объект Avro

@Override
public void put(Collection<SinkRecord> records) {

    if(records.isEmpty()) {
        System.out.println("no sink records to process for current poll operation");
        return;
    }



    for (SinkRecord sinkRecord: records) {

        GenericRecord avroRecord = (GenericRecord) sinkRecord.value();

        System.out.println("(Key) Schema>>>.");
        System.out.println(sinkRecord.keySchema().doc());
        System.out.println(sinkRecord.keySchema().getClass().getName());


        System.out.println("(Value) Schema *****");
        System.out.println(sinkRecord.valueSchema().doc());
        System.out.println(sinkRecord.valueSchema().getClass().getName());

        System.out.println("(Actual) Value ===== ");
        System.out.println(sinkRecord.value());
        System.out.println(sinkRecord.value().getClass().getName());

    }
}

Я хочу преобразовать SinkRecord в созданный объект AVRO.Также есть способ напрямую получить схему из реестра схем, чем получить схему из файла .avsc

1 Ответ

0 голосов
/ 26 декабря 2018

Итак, стандартная реализация kafka-connect позволяет указать value.converter.schema.registry.url в файле connect.properties для чтения схемы из записей avro и разрешения десериализации в GenericRecord.

value.converter.schema.registry.url=http://avro-schema-registry:8081

И вы всегда можете извлечьсхемы из вашей записи приемника, используя record.valueSchema().

Также есть утилита по слиянию.Вы можете отослать это для более подробной информации: https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroData.java

...