Имя таблицы уже включено в элемент source.table
. Вот пример сообщения о вставке в таблицу с именем rental
:
{
"before": null,
"after": {
"fullfillment.sakila.rental.Value": {
"rental_id": 13346,
"rental_date": 1124483301000,
"inventory_id": 4541,
"customer_id": 131,
"return_date": {
"long": 1125188901000
},
"staff_id": 2,
"last_update": "2006-02-15T21:30:53Z"
}
},
"source": {
"name": "fullfillment",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.000002",
"pos": 832,
"row": 0,
"snapshot": {
"boolean": true
},
"thread": null,
"db": {
"string": "sakila"
},
"table": {
"string": "rental"
}
},
"op": "c",
"ts_ms": {
"long": 1518190060267
}
}
Если вы хотите вставить дополнительные поля, вы можете использовать InsertField$Value
Преобразование одного сообщения, пример которого вы можете увидеть в этой статье .
Редактировать:
Если вы хотите, чтобы поле находилось в другой части сообщенияу вас есть несколько вариантов.Вы можете постобработать данные с помощью Kafka Streams, чтобы реструктурировать их по своему желанию.Вы можете использовать доступное Преобразование одного сообщения, чтобы сгладить компонент after
, а затем добавить статические значения:
"transforms": "unwrap,InsertTopic,InsertSourceDetails",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.InsertTopic.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field":"messagetopic",
"transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSourceDetails.static.field":"messagesource",
"transforms.InsertSourceDetails.static.value":"Debezium CDC from Oracle on asgard"
Или вы можете написать свое собственное Преобразование одного сообщения, чтобы выполнить именно те изменения, которые вы хотите сделать.,