Я использую разъем Debezium для перемещения данных изменения с mysql на S3. Соединитель источника использует сериализацию avro, а соединитель приемника пытается выполнить запись в s3 в формате паркета.
Поскольку полезная нагрузка события изменения дебезия является сложным типом, я просто хочу получить необходимые данные (раздел «после» в полезной нагрузке данных изменения). Для этого я добавляю их в конфиг коннектора раковины. Хотя это может быть добавлено в источнике, я бы хотел, чтобы раковина развернула маркер полезной нагрузки, чтобы иметь полную полезную нагрузку дебезия (до и после детали) в брокере.
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
Однако при записи в соединитель s3 возникает следующая ошибка .. IllegalArgumentException: схема Avro должна быть записью
Как мы можем это исправить, чтобы соединитель паркетной раковины s3 записывал только важные данные («после»), распаковывая полезную нагрузку avro? Кто-нибудь имел успех с этим?
SinkConnector
"name" :"s3-parquet-connector-v1",
"config":
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "lakefiles",
"topics.dir": "mydir",
"s3.region": "us-east-1",
"flush.size": "100",
"rotate.schedule.interval.ms": "20000",
"auto.register.schemas": "false",
"tasks.max": "1",
"s3.part.size": "5242880",
"timezone": "UTC",
"parquet.codec": "snappy",
"topics.regex": "xxx\\.(.*)",
"s3.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"partition.duration.ms":"600000",
"locale":"en-US",
"timezone":"EST",
"schema.registry.url": "http://schemaregistry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schemaregistry:8081",
"value.converter.schema.registry.url": "http://schemaregistry:8081",
"schema.compatibility":"NONE"
}
}
Исходный соединитель
{
"name": "xxx-debezium-connector-avro-v2",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysqllocal",
"database.port": "3306",
"database.user": "root",
"database.password": "xxxxxx",
"database.server.id": "223344",
"database.server.name": "mysqllocal",
"database.whitelist": "xxx",
"table.whitelist": "xxx.yyyy",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.dummy",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schemaregistry:8081",
"value.converter.schema.registry.url": "http://schemaregistry:8081",
"internal.key.converter":"org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter":"org.apache.kafka.connect.json.JsonConverter",
"database.serverTimezone": "EST",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$2.$3",
"decimal.handling.mode":"double"
}
}
SchemaRegistry: Confluent 5.0 .2
KafkaConnect: Confluent 5.4.1