Разъем для паркетной мойки Debezium S3, приводящий к IllegalArgumentException: схема Avro должна быть записью - PullRequest
0 голосов
/ 12 апреля 2020

Я использую разъем Debezium для перемещения данных изменения с mysql на S3. Соединитель источника использует сериализацию avro, а соединитель приемника пытается выполнить запись в s3 в формате паркета.

Поскольку полезная нагрузка события изменения дебезия является сложным типом, я просто хочу получить необходимые данные (раздел «после» в полезной нагрузке данных изменения). Для этого я добавляю их в конфиг коннектора раковины. Хотя это может быть добавлено в источнике, я бы хотел, чтобы раковина развернула маркер полезной нагрузки, чтобы иметь полную полезную нагрузку дебезия (до и после детали) в брокере.

"transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"

Однако при записи в соединитель s3 возникает следующая ошибка .. IllegalArgumentException: схема Avro должна быть записью

Как мы можем это исправить, чтобы соединитель паркетной раковины s3 записывал только важные данные («после»), распаковывая полезную нагрузку avro? Кто-нибудь имел успех с этим?

SinkConnector

  "name" :"s3-parquet-connector-v1",
  "config":
  {
   "connector.class": "io.confluent.connect.s3.S3SinkConnector",
   "storage.class": "io.confluent.connect.s3.storage.S3Storage",
   "s3.bucket.name": "lakefiles",
   "topics.dir": "mydir",
   "s3.region": "us-east-1",
   "flush.size": "100",
   "rotate.schedule.interval.ms": "20000",
   "auto.register.schemas": "false",
   "tasks.max": "1",
   "s3.part.size": "5242880",
   "timezone": "UTC",
   "parquet.codec": "snappy",
   "topics.regex": "xxx\\.(.*)",
   "s3.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
   "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
   "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
   "path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
   "partition.duration.ms":"600000",
   "locale":"en-US",
   "timezone":"EST",
    "schema.registry.url": "http://schemaregistry:8081",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schemaregistry:8081",
    "value.converter.schema.registry.url": "http://schemaregistry:8081",
     "schema.compatibility":"NONE"

  }
}

Исходный соединитель

{
  "name": "xxx-debezium-connector-avro-v2",  
  "config": {  
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",  
    "database.hostname": "mysqllocal",  
    "database.port": "3306",
    "database.user": "root",
    "database.password": "xxxxxx",
    "database.server.id": "223344",  
    "database.server.name": "mysqllocal",  
    "database.whitelist": "xxx",  
    "table.whitelist": "xxx.yyyy",
    "database.history.kafka.bootstrap.servers": "kafka:9092",  
    "database.history.kafka.topic": "schema-changes.dummy",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schemaregistry:8081",
    "value.converter.schema.registry.url": "http://schemaregistry:8081",
    "internal.key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "internal.value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "database.serverTimezone": "EST",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$2.$3",
    "decimal.handling.mode":"double"
  }
}

SchemaRegistry: Confluent 5.0 .2

KafkaConnect: Confluent 5.4.1

...