Создание новой SourceRecord из объекта - PullRequest
1 голос
/ 04 июня 2019

Я пишу соединитель Kafka, чтобы загрузить некоторые данные из нескольких источников на Github (текстовые и yaml-файлы) и преобразовать их в объекты определенного класса, который автоматически генерируется из avsc-файла:

{
  "type": "record",
  "name": "MatomoRecord",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "type", "type": "string"},
    {"name": "timestamp", "type": "long"}
  ]
}

Пока все прошло успешно.Итак, теперь у меня есть Карта объектов, которую я хочу сохранить в теме Кафки.Для этого я пытаюсь создать SourceRecords:

for (Map.Entry<String, MatomoRecord> record : records.entrySet()) {
  sourceRecords.add(new SourceRecord(
    sourcePartition,
    sourceOffset,
    matomoTopic,
    0,
    org.apache.kafka.connect.data.Schema.STRING_SCHEMA,
    record.getKey(),
    matomoSchema,
    record.getValue())
  );
}

Как определить схему значений типа org.apache.kafka.connect.data.Schema на основе схемы avro?Для теста я вручную создал схему с помощью Builder:

Schema matomoSchema = SchemaBuilder.struct()
                .name("MatomoRecord")
                .field("name", Schema.STRING_SCHEMA)
                .field("type", Schema.STRING_SCHEMA)
                .field("timestamp", Schema.INT64_SCHEMA)
                .build();

Результат был:

org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class MatomoRecord

Может ли кто-нибудь помочь мне определить схему значений на основе схемы avro?

С наилучшими пожеланиями, Мартин

Ответы [ 2 ]

0 голосов
/ 04 июня 2019

Вы не можете использовать record.getValue(), и при этом не существует прямого API от Avro для схемы подключения (без внутренних методов AvroConverter Confluent)

Вам необходимо проанализировать этот объект в Struct объект, которыйсоответствует схеме, которую вы определили (что выглядит нормально, если ни одно из ваших полей объекта не может быть нулевым)

Посмотрите на Javadoc, как вы можете определить его https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html

Примечание (не имеет значенияздесь), вложенные структуры должны быть построены «снизу вверх», где вы put дочерние структуры / массивы превращаете в родительские.

Ваш соединитель не обязательно должен зависеть от Avro, кроме как включать объекты вашей модели.Интерфейсы конвертера отвечают за преобразование вашей структуры с ее схемой в другие форматы данных (JSON, кодирование Avro Confluent, Protobuf и т. Д.)

0 голосов
/ 04 июня 2019

Схема KC - это схема JSON, которая ужасно похожа на схему Avro.Попробуйте org.apache.kafka.connect.json.JsonConverter#asConnectSchema - вам может понадобиться помассировать схему Avro, чтобы она заработала.

...