Я пытаюсь проверить конфигурацию моего коннектора Kafka сластиком поиска на AWS. При этом я получил следующее исключение:
java .lang.NullPointerException при
io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.getServerVersion(JestElasticsearchClient.java:231)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.(JestElasticsearchClient.java:142)
Я использую kafka-connect 5.4.0 docker image. Вот конфигурация разъема:
{
"name": "events-elasticsearch-sink",
"config": {
"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"elastic.security.protocol": "SSL",
"tasks.max": "1",
"topics": "KAFKA_TOPIC",
"key.ignore": "true",
"connection.url": "https://....aws.com",
"type.name": "mvp-audit-trail",
"name": "events-elasticsearch-sink",
"transforms":"InsertMessageTime,ConvertTimeValue,RouteTS",
"transforms.InsertMessageTime.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertMessageTime.timestamp.field": "connector_processing_timestamp",
"transforms.ConvertTimeValue.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.ConvertTimeValue.target.type":"Timestamp",
"transforms.ConvertTimeValue.field":"timestamp",
"transforms.ConvertTimeValue.format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
"transforms.RouteTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.RouteTS.topic.format":"${topic}-${timestamp}",
"transforms.RouteTS.timestamp.format":"yyyy-MM-dd"
}
}
Это было проверено локально и работает нормально. Тем не менее, когда я пытаюсь подключиться к AWS ES. Мы столкнулись с этой проблемой.
Служба Connect работает в модуле Pod и имеет полный доступ к Elasticsearch. Чтобы проверить доступ, я выполнил скрипт Python, указанный здесь . Сценарий Python может создавать и считывать индекс вasticsearch.
Не могли бы вы помочь с этим?