Я использую Kafka s3 Confluent Connect для копирования данных из apache Kafka в AWS S3.
Проблема в том, что у меня есть данные Kafka в формате AVRO, который НЕ использует сериализатор Avro реестра Confluent Schema, и я не могу сменить производителя Kafka. Поэтому мне нужно десериализовать существующие данные Avro из Kafka, а затем сохранить их в формате паркета в AWS S3. Я попытался использовать AvroConverter конфлуента в качестве преобразователя значения, как это -
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost/api/v1/avro
И я получаю эту ошибку -
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic dcp-all to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Насколько я понимаю, "io.confluent.connect.avro .AvroConverter "будет работать, только если данные записаны в Kafka с использованием сериализатора Avro реестра Confluent Schema, и, следовательно, я получаю эту ошибку. Поэтому мой вопрос: нужно ли в этом случае использовать универсальный c AvroConverter? И если да, как мне расширить существующий исходный код - https://github.com/confluentinc/kafka-connect-storage-cloud?
Любая помощь здесь будет оценена.