Использование: confluent-5.1.0
SINK Конфигурация:
curl -X POST \
http://localhost:8083/connectors \
-H 'cache-control: no-cache' \
-H 'content-type: application/json' \
-d '{
"name": "dbz-sink-connector-1",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "dbauditt4",
"topic.index.map": "our3.platform.business:plat_index",
"topics.regex":"our3.platform.business",
"key.ignore": "true",
"connection.url": "http://localhost:9200",
"group.id":"plot",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"transforms": "timestamp_convertor",
"transforms.timestamp_convertor.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp_convertor.target.type": "string",
"transforms.timestamp_convertor.format": "yyyy-MM-dd HH:mm:ss.SSSSSS",
"transforms.timestamp_convertor.field":"data.ts_ms"
}
}';
Пример сообщения в теме кафки:
{
"data": {
"before": null,
"after": {
"Id": 331458,
"business_id": 532334,
"sms_opted": 1
},
"source": {
"version": "0.7.5",
"name": "our3",
"server_id": 810143323,
"ts_sec": 1548661255,
"gtid": null,
"file": "mysql-bin-changelog.001786",
"pos": 1719980,
"row": 0,
"snapshot": false,
"thread": 11674162,
"db": "platform",
"table": "business"
},
"op": "c",
"ts_ms": 1548661255851
}
}
Соединитель вызывает исключение нулевого указателя.
java.lang.Thread.run (Thread.java:748) Вызывается: java.lang.NullPointerException в org.apache.kafka.connect.transforms.TimestampConverter.inferTimestampType (TimestampConverter.java:422) в
Может кто-нибудь, пожалуйста, помогите мне.Что я делаю неправильно ?