Раковина Kafka Connect Elasticsearch документы не проиндексированы - PullRequest
0 голосов
/ 28 января 2019

Я пытаюсь настроить тест для перемещения данных из MySQL в Elasticsearch.

У меня есть докерская установка с брокером, zookeeper, connect, ksql server и cli, реестром схемы и Elasticsearch.Я использую образы докеров из конфлюэнтной версии 5.1.0 , а для Elasticsearch я использую asticsearch: 6.5.4

Я настроил JDBCСоединитель для передачи данных из MySQL в Kafka, это работает. Я вижу, что мои темы созданы и, используя ksql-cli, я вижу новые сообщения в потоке, когда я обновляю строки в MySQL.

Я также настроил соединитель приемника Elasticsearch соединитель успешно создан, и индекс в Elasticsearch также существует, но я вижу нет документов в моем индексе Elasticsearch .

Это ESКонфигурация коннектора раковины:

{
    "name": "es-connector",
    "config": {
            "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://schema-registry:8081",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "connection.url": "http://es:9200",
            "type.name": "_doc",
            "topics": "test_topic",
            "drop.invalid.message": true,
            "behavior.on.null.values": "ignore",
            "behavior.on.malformed.documents": "ignore",
            "schema.ignore": true
    }
}

Это то, что я вижу, когда запрашиваю состояние коннектора раковины: curl -X GET http://connect:8083/connectors/es-connector

{
    "name": "es-connector",
    "connector": {
        "state": "RUNNING",
        "worker_id": "connect:8083"
    },
    "tasks": [
        {
            "state": "RUNNING",
            "id": 0,
            "worker_id": "connect:8083"
        }
    ],
    "type": "sink"
}

В Elasticsearch я вижу индекс http://es:9200/test_topic/_search

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 0,
    "max_score": null,
    "hits": []
  }
}

Я продолжаю делать обновления и вставки в MySQL, я вижу сообщения в потоке, используя ksql-cli, но в Elasticsearch документы не создаются.Я даже создал тему вручную, используя kafka-avro-console-producer и опубликовал сообщения, затем создал второй коннектор приемника для этой темы и тот же результат, я вижу индекс, но нет документов.

Я не вижу ошибок в kafka-connectпоэтому я не понимаю, почему не работает.Что-то не так с конфигурацией разъема?Я что-то упустил?

Редактировать:

Для конфигурации приемника Elasticsearch, которую я пробовал с и без этих строк:

"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true

И результат тот же, документов нет.

Редактировать

Я обнаружил ошибку:

Ключ используется в качестве идентификатора документа и не может быть нулевым

.

1 Ответ

0 голосов
/ 29 января 2019

При

"key.ignore": true

приемник Elasticsearch будет использовать топик + раздел + смещение в качестве идентификатора документа Elasticsearch.Как вы обнаружили, вы получите новый документ для каждого сообщения.

При

"key.ignore": false

приемник Elasticsearch будет использовать ключ сообщения Kafka в качестве идентификатора документа Elasticsearch.Если у вас нет ключа в сообщении Kafka, по понятным причинам вы получите ошибку Key is used as document id and cannot be null.Вы можете использовать различные методы для установки ключа в сообщении Kafka, включая Преобразование отдельного сообщения для установки ключа сообщения Kafka, если вы принимаете через Kafka Connect, подробно здесь .

...