У меня есть коннектор приемника S3 для нескольких тем (topic_a, topic_b, topic_ c), а у topic_a есть поле creat ed _date и topic_b, topic_ c имеет Creati на _Дата . Я использовал приведенный ниже transforms.RenameField.renames
, чтобы переименовать поле (creat ed _date: creati on _date), но поскольку единственная тема_a имеет created_date, а другие нет, соединитель не работает .
Я хочу переместить все сообщения (из всех тем с одним коннектором) в s3 с creation_date (и переименовать created_date в creation_date, если он существует), но я не могу определить регулярное выражение или преобразователь, чтобы переименовать поле (если есть) для указанного c topi c.
"config":{
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"errors.log.include.messages":"true",
"s3.region":"eu-west-1",
"topics.dir":"dir",
"flush.size":"5",
"tasks.max":"2",
"s3.part.size":"5242880",
"timezone":"UTC",
"locale":"en",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"errors.log.enable":"true",
"s3.bucket.name":"bucket",
"topics": "topic_a, topic_b, topic_c",
"s3.compression.type":"gzip",
"partitioner.class":"io.confluent.connect.storage.partitioner.DailyPartitioner",
"name":"NAME",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"key.converter.schemas.enable":"true",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable":"true",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"https://schemaregistry.com",
"enhanced.avro.schema.support": "true",
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "created_date:creation_date"
}