Я пытаюсь использовать Confluent Filter SMT с примером Debezium unwrap-smt .
Я добавил следующие конфигурации в исходный коннектор (Debezium MySQL ) config:
"transforms": "route,csFilter",
...
...
"transforms.csFilter.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.csFilter.filter.condition": "$.payload.after.source == 2",
"transforms.csFilter.filter.type": "exclude",
"transforms.csFilter.missing.or.null.behavior": "fail"
Поскольку этот SMT-фильтр предоставляется Confluent, я загрузил файл jar и скопировал (connect-transforms, connect-utils, json -path) jar файлы в каталог path-to-kafka/connect/debezium-connector-mysql
.
Когда я пытался зарегистрировать исходный коннектор Debezium MySQL,
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json"
localhost:8083/connectors/ -d @source_connector_config.json
я получил эту ошибку:
{"error_code":400,
"message":"Connector configuration is invalid and contains the following 1 error(s):\n
Invalid value $.payload.after.source == 2 for configuration filter.condition: Invalid json path defined.
Please refer to https://github.com/json-path/JsonPath README for correct use of json path.\n
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}
Я проверил выражение пути JSON с примерами, приведенными в этом руководстве . Все в порядке.
Не могли бы вы указать мне правильное направление? Что мне не хватает? Спасибо.