Kafka Connect topi c .key.ignore не работает должным образом - PullRequest
0 голосов
/ 02 мая 2020

Как я понял из документации kafka connect эта конфигурация должна игнорировать ключи для metricbeat и filebeat topi c, но не для сигналов тревоги. Но kafka connect не игнорирует ни один ключ.

Итак, это полностью json конфиг, который я нажимаю на kafka-connect поверх остальных

{
 "auto.create.indices.at.start": false,
 "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
 "connection.url": "http://elasticsearch:9200",
 "connection.timeout.ms": 5000,
 "read.timeout.ms": 5000,
 "tasks.max": "5",
 "topics": "filebeat,metricbeat,alarms",
 "behavior.on.null.values": "delete",
 "behavior.on.malformed.documents": "warn",
 "flush.timeout.ms":60000,
 "max.retries":42,
 "retry.backoff.ms": 100,
 "max.in.flight.requests": 5,
 "max.buffered.records":20000,
 "batch.size":4096,
 "drop.invalid.message": true,
 "schema.ignore": true,
 "topic.key.ignore": "metricbeat,filebeat",
 "key.ignore": false
 "name": "elasticsearch-ecs-connector",
 "type.name": "_doc",
 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
 "value.converter.schemas.enable": "false",
 "transforms":"routeTS",
 "transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",
 "transforms.routeTS.topic.format":"${topic}-${timestamp}",
 "transforms.routeTS.timestamp.format":"YYYY.MM.dd",
 "errors.tolerance": "all" ,
 "errors.log.enable": false ,
 "errors.log.include.messages": false,
 "errors.deadletterqueue.topic.name":"logstream-dlq",
 "errors.deadletterqueue.context.headers.enable":true ,
 "errors.deadletterqueue.topic.replication.factor": 1
}

Это запись во время запуска соединителя

[2020-05-01 21:07:49,960] INFO ElasticsearchSinkConnectorConfig values:
    auto.create.indices.at.start = false
    batch.size = 4096
    behavior.on.malformed.documents = warn
    behavior.on.null.values = delete
    compact.map.entries = true
    connection.compression = false
    connection.password = null
    connection.timeout.ms = 5000
    connection.url = [http://elasticsearch:9200]
    connection.username = null
    drop.invalid.message = true
    elastic.https.ssl.cipher.suites = null
    elastic.https.ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    elastic.https.ssl.endpoint.identification.algorithm = https
    elastic.https.ssl.key.password = null
    elastic.https.ssl.keymanager.algorithm = SunX509
    elastic.https.ssl.keystore.location = null
    elastic.https.ssl.keystore.password = null
    elastic.https.ssl.keystore.type = JKS
    elastic.https.ssl.protocol = TLS
    elastic.https.ssl.provider = null
    elastic.https.ssl.secure.random.implementation = null
    elastic.https.ssl.trustmanager.algorithm = PKIX
    elastic.https.ssl.truststore.location = null
    elastic.https.ssl.truststore.password = null
    elastic.https.ssl.truststore.type = JKS
    elastic.security.protocol = PLAINTEXT
    flush.timeout.ms = 60000
    key.ignore = false
    linger.ms = 1
    max.buffered.records = 20000
    max.in.flight.requests = 5
    max.retries = 42
    read.timeout.ms = 5000
    retry.backoff.ms = 100
    schema.ignore = true
    topic.index.map = []
    topic.key.ignore = [metricbeat, filebeat]
    topic.schema.ignore = []
    type.name = _doc
    write.method = insert

Iam используя Confluent Platform 5.5.0

1 Ответ

0 голосов
/ 02 мая 2020

metricbeat-2020.05.01+2+1391

Это правильно применение настройки key.ignore=true (это то, что вы указали при настройке "topic.key.ignore": "metricbeat")

Если Kafka Connect является игнорировать ключ сообщения Kafka, тогда он должен использовать что-то в качестве _id документа, записанного в Elasticsearch - поэтому он использует topic+partition+offset, что в точности так и есть.

...