Реализация кастомного AvroConverter для слияния kafka-connect-s3 - PullRequest
0 голосов
/ 14 января 2020

Я использую Kafka s3 Confluent Connect для копирования данных из apache Kafka в AWS S3.

Проблема в том, что у меня есть данные Kafka в формате AVRO, который НЕ использует сериализатор Avro реестра Confluent Schema, и я не могу сменить производителя Kafka. Поэтому мне нужно десериализовать существующие данные Avro из Kafka, а затем сохранить их в формате паркета в AWS S3. Я попытался использовать AvroConverter конфлуента в качестве преобразователя значения, как это -

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost/api/v1/avro

И я получаю эту ошибку -

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic dcp-all to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Насколько я понимаю, "io.confluent.connect.avro .AvroConverter "будет работать, только если данные записаны в Kafka с использованием сериализатора Avro реестра Confluent Schema, и, следовательно, я получаю эту ошибку. Поэтому мой вопрос: нужно ли в этом случае использовать универсальный c AvroConverter? И если да, как мне расширить существующий исходный код - https://github.com/confluentinc/kafka-connect-storage-cloud?

Любая помощь здесь будет оценена.

1 Ответ

1 голос
/ 15 января 2020

Вам не нужно расширять этот репо. Вам просто нужно реализовать Converter (часть Apache Kafka), затенить его в JAR, а затем поместить его на CLASSPATH вашего сотрудника Connect, как BlueApron сделал для Protobuf

Или посмотрите, работает ли это - https://github.com/farmdawgnation/registryless-avro-converter


НЕ используется реестр Confluent Schema

Тогда что реестр Вы используете ? У каждого, кого я знаю, есть конфигурации для взаимодействия с Confluent

...