Kafka Connect - Преобразует поле переименования, только если оно существует - PullRequest
0 голосов
/ 06 августа 2020

У меня есть коннектор приемника S3 для нескольких тем (topic_a, topic_b, topic_ c), а у topic_a есть поле creat ed _date и topic_b, topic_ c имеет Creati на _Дата . Я использовал приведенный ниже transforms.RenameField.renames, чтобы переименовать поле (creat ed _date: creati on _date), но поскольку единственная тема_a имеет created_date, а другие нет, соединитель не работает .

Я хочу переместить все сообщения (из всех тем с одним коннектором) в s3 с creation_date (и переименовать created_date в creation_date, если он существует), но я не могу определить регулярное выражение или преобразователь, чтобы переименовать поле (если есть) для указанного c topi c.

   "config":{
      "connector.class":"io.confluent.connect.s3.S3SinkConnector",
      "errors.log.include.messages":"true",
      "s3.region":"eu-west-1",
      "topics.dir":"dir",
      "flush.size":"5",
      "tasks.max":"2",
      "s3.part.size":"5242880",
      "timezone":"UTC",
      "locale":"en",
      "format.class":"io.confluent.connect.s3.format.json.JsonFormat",
      "errors.log.enable":"true",
      "s3.bucket.name":"bucket",
      "topics": "topic_a, topic_b, topic_c",
      "s3.compression.type":"gzip",
      "partitioner.class":"io.confluent.connect.storage.partitioner.DailyPartitioner",
      "name":"NAME",
      "storage.class":"io.confluent.connect.s3.storage.S3Storage",
      "key.converter.schemas.enable":"true",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter.schemas.enable":"true",
      "value.converter":"io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url":"https://schemaregistry.com",
      "enhanced.avro.schema.support": "true",
      "transforms": "RenameField",
      "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.RenameField.renames": "created_date:creation_date"
   }

1 Ответ

1 голос
/ 06 августа 2020

только topic_a имеет created_date, а другие нет,

Тогда вы должны использовать отдельные соединители. Один с преобразованием и всеми разделами с полем, затем другой без преобразования.

из всех разделов с одним соединителем

Это не очень хорошо масштабируется. Вы создаете ограниченные потоки потребителей и одну группу потребителей, чтобы читать сразу по нескольким темам. Для распределения нагрузки лучше использовать несколько разъемов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...