Создание идентификатора документа KafkaConnectasticSearch - PullRequest
0 голосов
/ 17 января 2020

В настоящее время я использую следующую конфигурацию соединителя и получаю исключение "Ключ используется в качестве идентификатора документа и не может иметь значение null"

{

    "name" :"hello7",
    "config" : {
        "name": "hello7",
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "hello7",
        "connection.url":"http://127.0.0.1:8080/",
        "type.name":"aggregator",
        "schema.ignore": "true",
        "topic.schema.ignore": "true",
        "topic.key.ignore": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false", 
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "key.ignore":"false",
        "transforms": "extractKey",
        "transforms.InsertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.InsertKey.fields":"customerId",
        "transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractKey.field":"customerId",
        "errors.log.enable":true,
        "errors.log.include.messages":true

     }
}

, и я отправляю следующее сообщение для topi c

{

  "customerId" : "i7y32o4823",
  "customerName" : "JOE",
  "address":"123 main street",
  "employee" : "ABC Company",
  "employeeAddress" : "178 Main Street"

}

Я получаю следующую ошибку

2020-01-17 16: 28: 33,624] ОШИБКА WorkerSinkTask {id = hello7-0 } Задача вызвала неисследованное и неустранимое исключение. Задача уничтожается и не восстановится, пока не будет перезапущена вручную. (org. apache .kafka.connect.runtime.WorkerSinkTask) org. apache .kafka.connect.errors.ConnectException: ключ используется в качестве идентификатора документа и не может быть пустым. в io.confluent.connect.elasticsearch.DataConverter.convertKey (DataConverter. java: 79) в io.confluent.connect.elasticsearch.DataConverter.convertRecord (DataConverter. java: 160) в io.confluent.connect.elasticsearch .ElasticsearchWriter. 1021 *: 169)

1 Ответ

0 голосов
/ 18 января 2020

Вы установили "key.ignore":"false" и упомянули только значение, которое вы отправляете в Kafka topi c.

Записи Кафки имеют ключи и значения. Если вы не укажете ключ, он будет нулевым.

Разъем приемника Elasticsearch не принимает нулевые ключи, поскольку ошибка говорит

ConnectException: ключ используется в качестве идентификатора документа, и не может быть нулевым

Кроме того, вы извлекаете только ключ в преобразовании, никогда не используя InsertKey

"transforms": "extractKey",

Вы можете отлаживать ваши соединители, используя автономный вывод FileSinkConnector на консоль

name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=hello7
...