Можно ли переименовать поля при запуске сообщения соединителя debezium mysql? - PullRequest
0 голосов
/ 27 марта 2019

Я настроил коннектор debezium mysql, мне нужно добавить дополнительное поле в качестве имени таблицы в полезной нагрузке.Какое изменение конфигурации мне нужно для достижения этой цели?

1 Ответ

2 голосов
/ 27 марта 2019

Имя таблицы уже включено в элемент source.table. Вот пример сообщения о вставке в таблицу с именем rental:

{
  "before": null,
  "after": {
    "fullfillment.sakila.rental.Value": {
      "rental_id": 13346,
      "rental_date": 1124483301000,
      "inventory_id": 4541,
      "customer_id": 131,
      "return_date": {
        "long": 1125188901000
      },
      "staff_id": 2,
      "last_update": "2006-02-15T21:30:53Z"
    }
  },
  "source": {
    "name": "fullfillment",
    "server_id": 0,
    "ts_sec": 0,
    "gtid": null,
    "file": "mysql-bin.000002",
    "pos": 832,
    "row": 0,
    "snapshot": {
      "boolean": true
    },
    "thread": null,
    "db": {
      "string": "sakila"
    },
    "table": {
      "string": "rental"
    }
  },
  "op": "c",
  "ts_ms": {
    "long": 1518190060267
  }
}

Если вы хотите вставить дополнительные поля, вы можете использовать InsertField$ValueПреобразование одного сообщения, пример которого вы можете увидеть в этой статье .


Редактировать:

Если вы хотите, чтобы поле находилось в другой части сообщенияу вас есть несколько вариантов.Вы можете постобработать данные с помощью Kafka Streams, чтобы реструктурировать их по своему желанию.Вы можете использовать доступное Преобразование одного сообщения, чтобы сгладить компонент after, а затем добавить статические значения:

            "transforms": "unwrap,InsertTopic,InsertSourceDetails",
            "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
            "transforms.InsertTopic.type":"org.apache.kafka.connect.transforms.InsertField$Value",
            "transforms.InsertTopic.topic.field":"messagetopic",
            "transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value",
            "transforms.InsertSourceDetails.static.field":"messagesource",
            "transforms.InsertSourceDetails.static.value":"Debezium CDC from Oracle on asgard"

Или вы можете написать свое собственное Преобразование одного сообщения, чтобы выполнить именно те изменения, которые вы хотите сделать.,

...