переименовать индекс вasticsearch с раковиной kafka - PullRequest
0 голосов
/ 15 мая 2018

Я использую следующую раковину.Проблема в том, что он устанавливает имя индексаasticsearch так же, как тема.Я хочу иметь другое имя эластичного индекса.Как я могу этого достичь.Я использую сливной 4

{
  "name": "es-sink-mysql-foobar-02",
  "config": {
    "_comment": "-- standard converter stuff -- this can actually go in the worker config globally --",
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter.schema.registry.url": "http://localhost:8081",


    "_comment": "--- Elasticsearch-specific config ---",
    "_comment": "Elasticsearch server address",
    "connection.url": "http://localhost:9200",

    "_comment": "Elasticsearch mapping name. Gets created automatically if doesn't exist  ",
    "type.name": "type.name=kafka-connect",
    "index.name": "asimtest",
    "_comment": "Which topic to stream data from into Elasticsearch",
    "topics": "mysql-foobar",

    "_comment": "If the Kafka message doesn't have a key (as is the case with JDBC source)  you need to specify key.ignore=true. If you don't, you'll get an error from the Connect task: 'ConnectException: Key is used as document id and can not be null.",
    "key.ignore": "true"
  }
}

1 Ответ

0 голосов
/ 15 мая 2018

Используйте для этого возможности Kafka Connect Single Message Transform (SMT).

Например, чтобы удалить префикс mysql-:

"_comment": "Drop the mysql- prefix from the topic name and thus Elasticsearch index name",
"transforms": "dropPrefix",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"mysql-(.*)",
"transforms.dropPrefix.replacement":"$1"

или удалить префикс, а также перенаправить сообщения на основанный на времени индекс Elasticsearch:

 "transforms":"dropPrefix,routeTS",  
 "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",  
 "transforms.dropPrefix.regex":"mysql-(.*)",  
 "transforms.dropPrefix.replacement":"$1",  
 "transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",  
 "transforms.routeTS.topic.format":"kafka-${topic}-${timestamp}",  
 "transforms.routeTS.timestamp.format":"YYYYMM"

Подробнее см. https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...