У меня есть kafka topi c, который получает данные из базы данных mysql, используя исходный соединитель Debezium mysql, ниже приведен формат одного из сообщений:
{
"Message": {
"schema": {
"type": "struct",
"fields": [
...
],
"optional": true,
"name": "mysql-server-1.inventory.somename"
},
"payload": {
"op": "u",
"ts_ms": 1465491411815,
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Doof",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne",
"last_name": "Marry",
"email": "annek@noanswer.org"
},
"source": {
"db": "inventory",
"table": "customers",
...
"query": "Update customers set last_name = 'Marry' where id = 1004"
}
}
}
}
Я хочу столбцы pu sh ts_ms, before, after
и id
(из объекта / строки) в другую базу данных, используя соединитель приемника jdb c со схемой таблицы как (id,before(text),after(text),timestamp)
, так как новичок в kafka не может понять:
как я могу извлечь только эти поля, из сообщения в pu sh и игнорировать другие?
как я могу преобразовать до, после того, как поля в строку / Формат сериализации?
Как мне извлечь id
из объекта? (в случае операции вставки, before будет нулевым, для удаления, after будет нулевым)
Для приведенного выше сообщения таблица назначения приемника должна содержать в конце следующие данные:
id: 1004
before: '{"id":1004,"first_name":"Anne","last_name":"Doof","email":"annek@noanswer.org"}'
after: '{"id":1004,"first_name":"Anne","last_name":"Marry","email":"annek@noanswer.org"}'
timestamp: 1465491411815