В настоящее время я использую следующую конфигурацию соединителя и получаю исключение "Ключ используется в качестве идентификатора документа и не может иметь значение null"
{
"name" :"hello7",
"config" : {
"name": "hello7",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "hello7",
"connection.url":"http://127.0.0.1:8080/",
"type.name":"aggregator",
"schema.ignore": "true",
"topic.schema.ignore": "true",
"topic.key.ignore": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"key.ignore":"false",
"transforms": "extractKey",
"transforms.InsertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.InsertKey.fields":"customerId",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"customerId",
"errors.log.enable":true,
"errors.log.include.messages":true
}
}
, и я отправляю следующее сообщение для topi c
{
"customerId" : "i7y32o4823",
"customerName" : "JOE",
"address":"123 main street",
"employee" : "ABC Company",
"employeeAddress" : "178 Main Street"
}
Я получаю следующую ошибку
2020-01-17 16: 28: 33,624] ОШИБКА WorkerSinkTask {id = hello7-0 } Задача вызвала неисследованное и неустранимое исключение. Задача уничтожается и не восстановится, пока не будет перезапущена вручную. (org. apache .kafka.connect.runtime.WorkerSinkTask) org. apache .kafka.connect.errors.ConnectException: ключ используется в качестве идентификатора документа и не может быть пустым. в io.confluent.connect.elasticsearch.DataConverter.convertKey (DataConverter. java: 79) в io.confluent.connect.elasticsearch.DataConverter.convertRecord (DataConverter. java: 160) в io.confluent.connect.elasticsearch .ElasticsearchWriter. 1021 *: 169)